Skip to content

Commit

Permalink
Merge pull request #573 from basedosdados/br_ibge_ppm
Browse files Browse the repository at this point in the history
add: br_ibge_ppm models and files
  • Loading branch information
laura-l-amaral authored May 7, 2024
2 parents 5b4be8e + ac784f4 commit 2c7f204
Show file tree
Hide file tree
Showing 17 changed files with 1,117 additions and 3 deletions.
6 changes: 3 additions & 3 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ models:
br_b3_cotacoes:
+materialized: table
+schema: br_b3_cotacoes
+post-hook:
- REVOKE `roles/bigquery.dataViewer` ON TABLE {{ this }} FROM "specialGroup:allUsers"
- GRANT `roles/bigquery.dataViewer` ON TABLE {{ this }} TO "group:[email protected]"
br_bcb_agencia:
+materialized: table
+schema: br_bcb_agencia
Expand Down Expand Up @@ -180,6 +177,9 @@ models:
br_ibge_pnadc:
+materialized: table
+schema: br_ibge_pnadc
br_ibge_ppm:
+materialized: table
+schema: br_ibge_ppm
br_inep_censo_educacao_superior:
+materialized: table
+schema: br_inep_censo_educacao_superior
Expand Down
20 changes: 20 additions & 0 deletions models/br_ibge_ppm/br_ibge_ppm__efetivo_rebanhos.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{
config(
alias="efetivo_rebanhos",
schema="br_ibge_ppm",
partition_by={
"field": "ano",
"data_type": "int64",
"range": {"start": 1974, "end": 2022, "interval": 1},
},
cluster_by=["sigla_uf"],
)
}}
select
safe_cast(ano as int64) ano,
safe_cast(sigla_uf as string) sigla_uf,
safe_cast(id_municipio as string) id_municipio,
safe_cast(tipo_rebanho as string) tipo_rebanho,
safe_cast(quantidade as int64) quantidade
from `basedosdados-staging.br_ibge_ppm_staging.efetivo_rebanhos` as t
where quantidade is not null
21 changes: 21 additions & 0 deletions models/br_ibge_ppm/br_ibge_ppm__producao_aquicultura.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{{
config(
alias="producao_aquicultura",
schema="br_ibge_ppm",
partition_by={
"field": "ano",
"data_type": "int64",
"range": {"start": 2013, "end": 2022, "interval": 1},
},
cluster_by=["sigla_uf"],
)
}}
select
safe_cast(ano as int64) ano,
safe_cast(sigla_uf as string) sigla_uf,
safe_cast(id_municipio as string) id_municipio,
safe_cast(produto as string) produto,
safe_cast(quantidade as int64) quantidade,
safe_cast(valor as int64) valor,
from `basedosdados-staging.br_ibge_ppm_staging.producao_aquicultura` as t
where quantidade is not null
22 changes: 22 additions & 0 deletions models/br_ibge_ppm/br_ibge_ppm__producao_origem_animal.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{{
config(
alias="producao_origem_animal",
schema="br_ibge_ppm",
partition_by={
"field": "ano",
"data_type": "int64",
"range": {"start": 1974, "end": 2022, "interval": 1},
},
cluster_by=["sigla_uf"],
)
}}
select
safe_cast(ano as int64) ano,
safe_cast(sigla_uf as string) sigla_uf,
safe_cast(id_municipio as string) id_municipio,
safe_cast(produto as string) produto,
safe_cast(unidade as string) unidade,
safe_cast(quantidade as int64) quantidade,
safe_cast(valor as int64) valor,
from `basedosdados-staging.br_ibge_ppm_staging.producao_origem_animal` as t
where quantidade is not null
20 changes: 20 additions & 0 deletions models/br_ibge_ppm/br_ibge_ppm__producao_pecuaria.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{
config(
alias="producao_pecuaria",
schema="br_ibge_ppm",
partition_by={
"field": "ano",
"data_type": "int64",
"range": {"start": 1974, "end": 2022, "interval": 1},
},
cluster_by=["sigla_uf"],
)
}}
select
safe_cast(ano as int64) ano,
safe_cast(sigla_uf as string) sigla_uf,
safe_cast(id_municipio as string) id_municipio,
safe_cast(ovinos_tosquiados as int64) ovinos_tosquiados,
safe_cast(vacas_ordenhadas as int64) vacas_ordenhadas,
from `basedosdados-staging.br_ibge_ppm_staging.producao_pecuaria` as t
where ovinos_tosquiados is not null or vacas_ordenhadas is not null
68 changes: 68 additions & 0 deletions models/br_ibge_ppm/efetivo_rebanhos/code/api_to_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import asyncio
import aiohttp
import json
import glob
from tqdm.asyncio import tqdm
from aiohttp import ClientTimeout, TCPConnector
from tqdm import tqdm
from typing import Any, Dict, List

API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]&classificacao={}[{}]"
AGREGADO = "3939" # É a tabela no SIDRA
PERIODOS = range(1974, 2022 + 1)
VARIAVEIS = ["105"] # As variáveis da tabela
NIVEL_GEOGRAFICO = "N6" # N6 = Municipal
LOCALIDADES = "all"
CLASSIFICACAO = "79" # Código pré-definido por agregado
CATEGORIAS = ["2670", "2675", "2672", "32794", "32795", "2681", "2677"
"32796", "32793", "2680"] # Produtos
ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")]
ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS]

async def fetch(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]:
"""
Faz uma requisição GET à API e retorna a resposta em formato JSON.
Parâmetros:
- session (aiohttp.ClientSession): A sessão do cliente aiohttp.
- url (str): A URL da API para a qual a requisição será feita.
Retorna:
- Dict[str, Any]: A resposta da API em formato JSON.
"""
async with session.get(url) as response:
return await response.json()

async def main(years: List[int], variables: List[str], categories: List[str]) -> None:
"""
Faz requisições para a API para cada ano, variável e categoria, salvando as respostas em arquivos JSON.
Parâmetros:
- years (List[int]): Lista de anos para os quais os dados serão consultados.
- variables (List[str]): Lista de variáveis da tabela a serem consultadas.
- categories (List[str]): Lista de categorias a serem consultadas.
Retorna:
- None
"""
for year in years:
print(f'Consultando dados do ano: {year}')
async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session:
tasks = []
for variable in variables:
for category in categories:
url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES, CLASSIFICACAO, category)
task = fetch(session, url)
tasks.append(asyncio.ensure_future(task))
responses = []
for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
try:
response = await future
responses.append(response)
except asyncio.TimeoutError:
print(f"Request timed out for {url}")
with open(f'../json/{year}.json', 'a') as f:
json.dump(responses, f)

if __name__ == "__main__":
asyncio.run(main(ANOS_RESTANTES, VARIAVEIS, CATEGORIAS))
109 changes: 109 additions & 0 deletions models/br_ibge_ppm/efetivo_rebanhos/code/json_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import json
import numpy as np
import os
import re
from collections import OrderedDict
import pandas as pd
import pyarrow as pa
from pathlib import Path

def parse_file(file_path):
dict_list = []

with open(file_path) as f:
json_list = json.load(f)

for j in json_list:
temp_od = OrderedDict()
temp_variavel = j[0]["variavel"]
for r in j[0]["resultados"]:
temp_caracteristica = list(r["classificacoes"][0]["categoria"].values())[0]
for s in r["series"]:
temp_ano = list(s["serie"].keys())[0]
temp_valor = list(s["serie"].values())[0]
temp_sigla_uf = s["localidade"]["nome"].split("-")[-1].strip()
temp_id_municipio = s["localidade"]["id"]

temp_od["ano"] = temp_ano
temp_od["sigla_uf"] = temp_sigla_uf
temp_od["id_municipio"] = temp_id_municipio
temp_od["tipo_rebanho"] = temp_caracteristica
temp_od[temp_variavel] = temp_valor

dict_list.append(dict(temp_od))
temp_od.clear()
return dict_list

def join_dicts_by_keys(dict_list, keys):
result = {}
for d in dict_list:
key_tuple = tuple(d[k] for k in keys)
if key_tuple not in result:
result[key_tuple] = d
else:
result[key_tuple].update(d)
return list(result.values())

def create_raw_df(dict_list):
return pd.DataFrame(dict_list, dtype=str)

def rename_columns(dataframe):
renamed_dataframe = dataframe.rename(columns={
"Tipo de rebanho": "tipo_rebanho",
"Efetivo dos rebanhos": "quantidade"
})
return renamed_dataframe

def treat_columns(dataframe):
dataframe = dataframe[["ano", "sigla_uf", "id_municipio", "tipo_rebanho",
"quantidade"]]
COLUNAS_PARA_TRATAR = ["quantidade"]

for coluna in COLUNAS_PARA_TRATAR:
dataframe[coluna] = dataframe[coluna].apply(lambda x: np.nan if x in ("-", "..", "...", "X") else x)
dataframe[coluna] = dataframe[coluna].astype("Int64")
return dataframe

def get_existing_years(directory):
root_dir = Path(directory)
year_dirs = root_dir.glob('ano=*')
years = set()
for year_dir in year_dirs:
match = re.match(r'ano=(\d+)', year_dir.name)
if match:
year = int(match.group(1))
years.add(year)

return sorted(list(years))

if __name__ == '__main__':
ANOS_TRANSFORMADOS = get_existing_years("../parquet")
ARQUIVOS_JSON = list(Path('../json/').glob('*.json'))
JSON_FALTANTES = [arquivo for arquivo in ARQUIVOS_JSON if int(arquivo.stem) not in ANOS_TRANSFORMADOS]

for path in JSON_FALTANTES:
print(f"Criando dict_list com base no arquivo: {path}...")
dict_list = parse_file(path)
print("Concatendo as colunas do dict_list...")
dict_list = join_dicts_by_keys(dict_list, ["ano", "sigla_uf", "id_municipio", "tipo_rebanho"])
print("Criando DataFrame a partir do dict_list...")
df = create_raw_df(dict_list)
print("Renomeando as colunas do DataFrame...")
df = rename_columns(df)
print("Tratando as colunas do DataFrame...")
df = treat_columns(df)
print("Transformações finalizadas!")
temp_ano = df["ano"].max()
print("Deletando a coluna ano para possibilitar o particionamento...")
df.drop(columns=["ano"], inplace=True)
print("Transformações finalizadas!")
temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet"
print(f"Exportando o DataFrame particionado em {temp_export_file_path}...")
os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True)
temp_schema = pa.schema([("sigla_uf", pa.string()),
("id_municipio", pa.string()),
("tipo_rebanho", pa.string()),
("quantidade", pa.int64())])
df.to_parquet(temp_export_file_path, schema=temp_schema, index=False)
del(df) # Liberando memória
print()
70 changes: 70 additions & 0 deletions models/br_ibge_ppm/producao_aquicultura/code/api_to_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import asyncio
import aiohttp
import json
import glob
from tqdm.asyncio import tqdm
from aiohttp import ClientTimeout, TCPConnector
from typing import Any, Dict, List

API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]&classificacao={}[{}]"
AGREGADO = "3940" # É a tabela no SIDRA
PERIODOS = range(2013, 2022 + 1)
VARIAVEIS = ["4146", "215"] # As variáveis da tabela
NIVEL_GEOGRAFICO = "N6" # N6 = Municipal
LOCALIDADES = "all"
CLASSIFICACAO = "654" # Código pré-definido por agregado
CATEGORIAS = ["32861", "32865", "32866", "32867", "32868", "32869", "32870",
"32871", "32872", "32873", "32874", "32875", "32876", "32877",
"32878", "32879", "32880", "32881", "32886", "32887", "32888",
"32889", "32890", "32891"] # Produtos
ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")]
ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS]

async def fetch(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]:
"""
Faz uma requisição GET à API e retorna a resposta em formato JSON, incluindo a URL usada.
Parâmetros:
- session (aiohttp.ClientSession): A sessão do cliente aiohttp.
- url (str): A URL da API para a qual a requisição será feita.
Retorna:
- Dict[str, Any]: A resposta da API em formato JSON, incluindo a URL usada.
"""
async with session.get(url) as response:
data = await response.json()
return {'url': url, 'data': data}

async def main(years: List[int], variables: List[str], categories: List[str]) -> None:
"""
Faz requisições para a API para cada ano, variável e categoria, salvando as respostas em arquivos JSON.
Parâmetros:
- years (List[int]): Lista de anos para os quais os dados serão consultados.
- variables (List[str]): Lista de variáveis da tabela a serem consultadas.
- categories (List[str]): Lista de categorias a serem consultadas.
Retorna:
- None
"""
for year in years:
print(f'Consultando dados do ano: {year}')
async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session:
tasks = []
for variable in variables:
for category in categories:
url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES, CLASSIFICACAO, category)
task = fetch(session, url)
tasks.append(asyncio.ensure_future(task))
responses = []
for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
try:
response = await future
responses.append(response)
except asyncio.TimeoutError:
print(f"Request timed out for {url}")
with open(f'../json/{year}.json', 'a') as f:
json.dump(responses, f)

if __name__ == "__main__":
asyncio.run(main(ANOS_RESTANTES, VARIAVEIS, CATEGORIAS))
Loading

0 comments on commit 2c7f204

Please sign in to comment.