diff --git a/dbt_project.yml b/dbt_project.yml index d02dca55..9e4e1f5d 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -63,9 +63,6 @@ models: br_b3_cotacoes: +materialized: table +schema: br_b3_cotacoes - +post-hook: - - REVOKE `roles/bigquery.dataViewer` ON TABLE {{ this }} FROM "specialGroup:allUsers" - - GRANT `roles/bigquery.dataViewer` ON TABLE {{ this }} TO "group:bd-pro@basedosdados.org" br_bcb_agencia: +materialized: table +schema: br_bcb_agencia @@ -180,6 +177,9 @@ models: br_ibge_pnadc: +materialized: table +schema: br_ibge_pnadc + br_ibge_ppm: + +materialized: table + +schema: br_ibge_ppm br_inep_censo_educacao_superior: +materialized: table +schema: br_inep_censo_educacao_superior diff --git a/models/br_ibge_ppm/br_ibge_ppm__efetivo_rebanhos.sql b/models/br_ibge_ppm/br_ibge_ppm__efetivo_rebanhos.sql new file mode 100644 index 00000000..d2261fbb --- /dev/null +++ b/models/br_ibge_ppm/br_ibge_ppm__efetivo_rebanhos.sql @@ -0,0 +1,20 @@ +{{ + config( + alias="efetivo_rebanhos", + schema="br_ibge_ppm", + partition_by={ + "field": "ano", + "data_type": "int64", + "range": {"start": 1974, "end": 2022, "interval": 1}, + }, + cluster_by=["sigla_uf"], + ) +}} +select + safe_cast(ano as int64) ano, + safe_cast(sigla_uf as string) sigla_uf, + safe_cast(id_municipio as string) id_municipio, + safe_cast(tipo_rebanho as string) tipo_rebanho, + safe_cast(quantidade as int64) quantidade +from `basedosdados-staging.br_ibge_ppm_staging.efetivo_rebanhos` as t +where quantidade is not null diff --git a/models/br_ibge_ppm/br_ibge_ppm__producao_aquicultura.sql b/models/br_ibge_ppm/br_ibge_ppm__producao_aquicultura.sql new file mode 100644 index 00000000..5f885c72 --- /dev/null +++ b/models/br_ibge_ppm/br_ibge_ppm__producao_aquicultura.sql @@ -0,0 +1,21 @@ +{{ + config( + alias="producao_aquicultura", + schema="br_ibge_ppm", + partition_by={ + "field": "ano", + "data_type": "int64", + "range": {"start": 2013, "end": 2022, "interval": 1}, + }, + cluster_by=["sigla_uf"], + ) +}} +select + safe_cast(ano as int64) ano, + safe_cast(sigla_uf as string) sigla_uf, + safe_cast(id_municipio as string) id_municipio, + safe_cast(produto as string) produto, + safe_cast(quantidade as int64) quantidade, + safe_cast(valor as int64) valor, +from `basedosdados-staging.br_ibge_ppm_staging.producao_aquicultura` as t +where quantidade is not null diff --git a/models/br_ibge_ppm/br_ibge_ppm__producao_origem_animal.sql b/models/br_ibge_ppm/br_ibge_ppm__producao_origem_animal.sql new file mode 100644 index 00000000..2b7e701d --- /dev/null +++ b/models/br_ibge_ppm/br_ibge_ppm__producao_origem_animal.sql @@ -0,0 +1,22 @@ +{{ + config( + alias="producao_origem_animal", + schema="br_ibge_ppm", + partition_by={ + "field": "ano", + "data_type": "int64", + "range": {"start": 1974, "end": 2022, "interval": 1}, + }, + cluster_by=["sigla_uf"], + ) +}} +select + safe_cast(ano as int64) ano, + safe_cast(sigla_uf as string) sigla_uf, + safe_cast(id_municipio as string) id_municipio, + safe_cast(produto as string) produto, + safe_cast(unidade as string) unidade, + safe_cast(quantidade as int64) quantidade, + safe_cast(valor as int64) valor, +from `basedosdados-staging.br_ibge_ppm_staging.producao_origem_animal` as t +where quantidade is not null diff --git a/models/br_ibge_ppm/br_ibge_ppm__producao_pecuaria.sql b/models/br_ibge_ppm/br_ibge_ppm__producao_pecuaria.sql new file mode 100644 index 00000000..50c50f89 --- /dev/null +++ b/models/br_ibge_ppm/br_ibge_ppm__producao_pecuaria.sql @@ -0,0 +1,20 @@ +{{ + config( + alias="producao_pecuaria", + schema="br_ibge_ppm", + partition_by={ + "field": "ano", + "data_type": "int64", + "range": {"start": 1974, "end": 2022, "interval": 1}, + }, + cluster_by=["sigla_uf"], + ) +}} +select + safe_cast(ano as int64) ano, + safe_cast(sigla_uf as string) sigla_uf, + safe_cast(id_municipio as string) id_municipio, + safe_cast(ovinos_tosquiados as int64) ovinos_tosquiados, + safe_cast(vacas_ordenhadas as int64) vacas_ordenhadas, +from `basedosdados-staging.br_ibge_ppm_staging.producao_pecuaria` as t +where ovinos_tosquiados is not null or vacas_ordenhadas is not null diff --git a/models/br_ibge_ppm/efetivo_rebanhos/code/api_to_json.py b/models/br_ibge_ppm/efetivo_rebanhos/code/api_to_json.py new file mode 100644 index 00000000..982c3be0 --- /dev/null +++ b/models/br_ibge_ppm/efetivo_rebanhos/code/api_to_json.py @@ -0,0 +1,68 @@ +import asyncio +import aiohttp +import json +import glob +from tqdm.asyncio import tqdm +from aiohttp import ClientTimeout, TCPConnector +from tqdm import tqdm +from typing import Any, Dict, List + +API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]&classificacao={}[{}]" +AGREGADO = "3939" # É a tabela no SIDRA +PERIODOS = range(1974, 2022 + 1) +VARIAVEIS = ["105"] # As variáveis da tabela +NIVEL_GEOGRAFICO = "N6" # N6 = Municipal +LOCALIDADES = "all" +CLASSIFICACAO = "79" # Código pré-definido por agregado +CATEGORIAS = ["2670", "2675", "2672", "32794", "32795", "2681", "2677" + "32796", "32793", "2680"] # Produtos +ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")] +ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS] + +async def fetch(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]: + """ + Faz uma requisição GET à API e retorna a resposta em formato JSON. + + Parâmetros: + - session (aiohttp.ClientSession): A sessão do cliente aiohttp. + - url (str): A URL da API para a qual a requisição será feita. + + Retorna: + - Dict[str, Any]: A resposta da API em formato JSON. + """ + async with session.get(url) as response: + return await response.json() + +async def main(years: List[int], variables: List[str], categories: List[str]) -> None: + """ + Faz requisições para a API para cada ano, variável e categoria, salvando as respostas em arquivos JSON. + + Parâmetros: + - years (List[int]): Lista de anos para os quais os dados serão consultados. + - variables (List[str]): Lista de variáveis da tabela a serem consultadas. + - categories (List[str]): Lista de categorias a serem consultadas. + + Retorna: + - None + """ + for year in years: + print(f'Consultando dados do ano: {year}') + async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session: + tasks = [] + for variable in variables: + for category in categories: + url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES, CLASSIFICACAO, category) + task = fetch(session, url) + tasks.append(asyncio.ensure_future(task)) + responses = [] + for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)): + try: + response = await future + responses.append(response) + except asyncio.TimeoutError: + print(f"Request timed out for {url}") + with open(f'../json/{year}.json', 'a') as f: + json.dump(responses, f) + +if __name__ == "__main__": + asyncio.run(main(ANOS_RESTANTES, VARIAVEIS, CATEGORIAS)) diff --git a/models/br_ibge_ppm/efetivo_rebanhos/code/json_to_parquet.py b/models/br_ibge_ppm/efetivo_rebanhos/code/json_to_parquet.py new file mode 100644 index 00000000..83abaf59 --- /dev/null +++ b/models/br_ibge_ppm/efetivo_rebanhos/code/json_to_parquet.py @@ -0,0 +1,109 @@ +import json +import numpy as np +import os +import re +from collections import OrderedDict +import pandas as pd +import pyarrow as pa +from pathlib import Path + +def parse_file(file_path): + dict_list = [] + + with open(file_path) as f: + json_list = json.load(f) + + for j in json_list: + temp_od = OrderedDict() + temp_variavel = j[0]["variavel"] + for r in j[0]["resultados"]: + temp_caracteristica = list(r["classificacoes"][0]["categoria"].values())[0] + for s in r["series"]: + temp_ano = list(s["serie"].keys())[0] + temp_valor = list(s["serie"].values())[0] + temp_sigla_uf = s["localidade"]["nome"].split("-")[-1].strip() + temp_id_municipio = s["localidade"]["id"] + + temp_od["ano"] = temp_ano + temp_od["sigla_uf"] = temp_sigla_uf + temp_od["id_municipio"] = temp_id_municipio + temp_od["tipo_rebanho"] = temp_caracteristica + temp_od[temp_variavel] = temp_valor + + dict_list.append(dict(temp_od)) + temp_od.clear() + return dict_list + +def join_dicts_by_keys(dict_list, keys): + result = {} + for d in dict_list: + key_tuple = tuple(d[k] for k in keys) + if key_tuple not in result: + result[key_tuple] = d + else: + result[key_tuple].update(d) + return list(result.values()) + +def create_raw_df(dict_list): + return pd.DataFrame(dict_list, dtype=str) + +def rename_columns(dataframe): + renamed_dataframe = dataframe.rename(columns={ + "Tipo de rebanho": "tipo_rebanho", + "Efetivo dos rebanhos": "quantidade" + }) + return renamed_dataframe + +def treat_columns(dataframe): + dataframe = dataframe[["ano", "sigla_uf", "id_municipio", "tipo_rebanho", + "quantidade"]] + COLUNAS_PARA_TRATAR = ["quantidade"] + + for coluna in COLUNAS_PARA_TRATAR: + dataframe[coluna] = dataframe[coluna].apply(lambda x: np.nan if x in ("-", "..", "...", "X") else x) + dataframe[coluna] = dataframe[coluna].astype("Int64") + return dataframe + +def get_existing_years(directory): + root_dir = Path(directory) + year_dirs = root_dir.glob('ano=*') + years = set() + for year_dir in year_dirs: + match = re.match(r'ano=(\d+)', year_dir.name) + if match: + year = int(match.group(1)) + years.add(year) + + return sorted(list(years)) + +if __name__ == '__main__': + ANOS_TRANSFORMADOS = get_existing_years("../parquet") + ARQUIVOS_JSON = list(Path('../json/').glob('*.json')) + JSON_FALTANTES = [arquivo for arquivo in ARQUIVOS_JSON if int(arquivo.stem) not in ANOS_TRANSFORMADOS] + + for path in JSON_FALTANTES: + print(f"Criando dict_list com base no arquivo: {path}...") + dict_list = parse_file(path) + print("Concatendo as colunas do dict_list...") + dict_list = join_dicts_by_keys(dict_list, ["ano", "sigla_uf", "id_municipio", "tipo_rebanho"]) + print("Criando DataFrame a partir do dict_list...") + df = create_raw_df(dict_list) + print("Renomeando as colunas do DataFrame...") + df = rename_columns(df) + print("Tratando as colunas do DataFrame...") + df = treat_columns(df) + print("Transformações finalizadas!") + temp_ano = df["ano"].max() + print("Deletando a coluna ano para possibilitar o particionamento...") + df.drop(columns=["ano"], inplace=True) + print("Transformações finalizadas!") + temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet" + print(f"Exportando o DataFrame particionado em {temp_export_file_path}...") + os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True) + temp_schema = pa.schema([("sigla_uf", pa.string()), + ("id_municipio", pa.string()), + ("tipo_rebanho", pa.string()), + ("quantidade", pa.int64())]) + df.to_parquet(temp_export_file_path, schema=temp_schema, index=False) + del(df) # Liberando memória + print() \ No newline at end of file diff --git a/models/br_ibge_ppm/producao_aquicultura/code/api_to_json.py b/models/br_ibge_ppm/producao_aquicultura/code/api_to_json.py new file mode 100644 index 00000000..3dfca90c --- /dev/null +++ b/models/br_ibge_ppm/producao_aquicultura/code/api_to_json.py @@ -0,0 +1,70 @@ +import asyncio +import aiohttp +import json +import glob +from tqdm.asyncio import tqdm +from aiohttp import ClientTimeout, TCPConnector +from typing import Any, Dict, List + +API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]&classificacao={}[{}]" +AGREGADO = "3940" # É a tabela no SIDRA +PERIODOS = range(2013, 2022 + 1) +VARIAVEIS = ["4146", "215"] # As variáveis da tabela +NIVEL_GEOGRAFICO = "N6" # N6 = Municipal +LOCALIDADES = "all" +CLASSIFICACAO = "654" # Código pré-definido por agregado +CATEGORIAS = ["32861", "32865", "32866", "32867", "32868", "32869", "32870", + "32871", "32872", "32873", "32874", "32875", "32876", "32877", + "32878", "32879", "32880", "32881", "32886", "32887", "32888", + "32889", "32890", "32891"] # Produtos +ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")] +ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS] + +async def fetch(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]: + """ + Faz uma requisição GET à API e retorna a resposta em formato JSON, incluindo a URL usada. + + Parâmetros: + - session (aiohttp.ClientSession): A sessão do cliente aiohttp. + - url (str): A URL da API para a qual a requisição será feita. + + Retorna: + - Dict[str, Any]: A resposta da API em formato JSON, incluindo a URL usada. + """ + async with session.get(url) as response: + data = await response.json() + return {'url': url, 'data': data} + +async def main(years: List[int], variables: List[str], categories: List[str]) -> None: + """ + Faz requisições para a API para cada ano, variável e categoria, salvando as respostas em arquivos JSON. + + Parâmetros: + - years (List[int]): Lista de anos para os quais os dados serão consultados. + - variables (List[str]): Lista de variáveis da tabela a serem consultadas. + - categories (List[str]): Lista de categorias a serem consultadas. + + Retorna: + - None + """ + for year in years: + print(f'Consultando dados do ano: {year}') + async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session: + tasks = [] + for variable in variables: + for category in categories: + url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES, CLASSIFICACAO, category) + task = fetch(session, url) + tasks.append(asyncio.ensure_future(task)) + responses = [] + for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)): + try: + response = await future + responses.append(response) + except asyncio.TimeoutError: + print(f"Request timed out for {url}") + with open(f'../json/{year}.json', 'a') as f: + json.dump(responses, f) + +if __name__ == "__main__": + asyncio.run(main(ANOS_RESTANTES, VARIAVEIS, CATEGORIAS)) diff --git a/models/br_ibge_ppm/producao_aquicultura/code/json_to_parquet.py b/models/br_ibge_ppm/producao_aquicultura/code/json_to_parquet.py new file mode 100644 index 00000000..9104cbd6 --- /dev/null +++ b/models/br_ibge_ppm/producao_aquicultura/code/json_to_parquet.py @@ -0,0 +1,111 @@ +import json +import numpy as np +import os +import re +from collections import OrderedDict +import pandas as pd +import pyarrow as pa +from pathlib import Path + +def parse_file(file_path): + dict_list = [] + + with open(file_path) as f: + json_list = json.load(f) + + for j in json_list: + temp_od = OrderedDict() + temp_variavel = j["data"][0]["variavel"] + for r in j["data"][0]["resultados"]: + temp_caracteristica = list(r["classificacoes"][0]["categoria"].values())[0] + for s in r["series"]: + temp_ano = list(s["serie"].keys())[0] + temp_valor = list(s["serie"].values())[0] + temp_sigla_uf = s["localidade"]["nome"].split("-")[-1].strip() + temp_id_municipio = s["localidade"]["id"] + + temp_od["ano"] = temp_ano + temp_od["sigla_uf"] = temp_sigla_uf + temp_od["id_municipio"] = temp_id_municipio + temp_od["produto"] = temp_caracteristica + temp_od[temp_variavel] = temp_valor + + dict_list.append(dict(temp_od)) + temp_od.clear() + return dict_list + +def join_dicts_by_keys(dict_list, keys): + result = {} + for d in dict_list: + key_tuple = tuple(d[k] for k in keys) + if key_tuple not in result: + result[key_tuple] = d + else: + result[key_tuple].update(d) + return list(result.values()) + +def create_raw_df(dict_list): + return pd.DataFrame(dict_list, dtype=str) + +def rename_columns(dataframe): + renamed_dataframe = dataframe.rename(columns={ + "Tipo de produto da aquicultura": "produto", + "Produção da aquicultura": "quantidade", + "Valor da produção": "valor" + }) + return renamed_dataframe + +def treat_columns(dataframe): + dataframe = dataframe[["ano", "sigla_uf", "id_municipio", "produto", + "quantidade", "valor"]] + COLUNAS_PARA_TRATAR = ["quantidade", "valor"] + + for coluna in COLUNAS_PARA_TRATAR: + dataframe[coluna] = dataframe[coluna].apply(lambda x: np.nan if x in ("-", "..", "...", "X") else x) + dataframe[coluna] = dataframe[coluna].astype("Int64") + return dataframe + +def get_existing_years(directory): + root_dir = Path(directory) + year_dirs = root_dir.glob('ano=*') + years = set() + for year_dir in year_dirs: + match = re.match(r'ano=(\d+)', year_dir.name) + if match: + year = int(match.group(1)) + years.add(year) + + return sorted(list(years)) + +if __name__ == '__main__': + ANOS_TRANSFORMADOS = get_existing_years("../parquet") + ARQUIVOS_JSON = list(Path('../json/').glob('*.json')) + JSON_FALTANTES = [arquivo for arquivo in ARQUIVOS_JSON if int(arquivo.stem) not in ANOS_TRANSFORMADOS] + + for path in JSON_FALTANTES: + print(f"Criando dict_list com base no arquivo: {path}...") + dict_list = parse_file(path) + print("Concatendo as colunas do dict_list...") + dict_list = join_dicts_by_keys(dict_list, ["ano", "sigla_uf", "id_municipio", "produto"]) + print("Criando DataFrame a partir do dict_list...") + df = create_raw_df(dict_list) + print("Renomeando as colunas do DataFrame...") + df = rename_columns(df) + print("Tratando as colunas do DataFrame...") + df = treat_columns(df) + print("Transformações finalizadas!") + temp_ano = df["ano"].max() + print("Deletando a coluna ano para possibilitar o particionamento...") + df.drop(columns=["ano"], inplace=True) + print("Transformações finalizadas!") + temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet" + print(f"Exportando o DataFrame particionado em {temp_export_file_path}...") + os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True) + temp_schema = pa.schema([("sigla_uf", pa.string()), + ("id_municipio", pa.string()), + ("produto", pa.string()), + ("quantidade", pa.int64()), + ("valor", pa.int64())]) + df.to_parquet(temp_export_file_path, schema=temp_schema, index=False) + del(df) # Liberando memória + print() \ No newline at end of file diff --git a/models/br_ibge_ppm/producao_origem_animal/code/api_to_json.py b/models/br_ibge_ppm/producao_origem_animal/code/api_to_json.py new file mode 100644 index 00000000..a1614e86 --- /dev/null +++ b/models/br_ibge_ppm/producao_origem_animal/code/api_to_json.py @@ -0,0 +1,68 @@ +import asyncio +import aiohttp +import json +import glob +from tqdm.asyncio import tqdm +from aiohttp import ClientTimeout, TCPConnector +from tqdm import tqdm +from typing import Any, Dict, List + +API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]&classificacao={}[{}]" +AGREGADO = "74" # É a tabela no SIDRA +PERIODOS = range(1974, 2022 + 1) +VARIAVEIS = ["106", "215"] # As variáveis da tabela +NIVEL_GEOGRAFICO = "N6" # N6 = Municipal +LOCALIDADES = "all" +CLASSIFICACAO = "80" # Código pré-definido por agregado +CATEGORIAS = ["2682", "2683", "2684", "2685", "2686", "2687"] # Produtos +ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")] +ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS] + +async def fetch(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]: + """ + Faz uma requisição GET à API e retorna a resposta em formato JSON, incluindo a URL usada. + + Parâmetros: + - session (aiohttp.ClientSession): A sessão do cliente aiohttp. + - url (str): A URL da API para a qual a requisição será feita. + + Retorna: + - Dict[str, Any]: A resposta da API em formato JSON, incluindo a URL usada. + """ + async with session.get(url) as response: + data = await response.json() + return {'url': url, 'data': data} + +async def main(years: List[int], variables: List[str], categories: List[str]) -> None: + """ + Faz requisições para a API para cada ano, variável e categoria, salvando as respostas em arquivos JSON. + + Parâmetros: + - years (List[int]): Lista de anos para os quais os dados serão consultados. + - variables (List[str]): Lista de variáveis da tabela a serem consultadas. + - categories (List[str]): Lista de categorias a serem consultadas. + + Retorna: + - None + """ + for year in years: + print(f'Consultando dados do ano: {year}') + async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session: + tasks = [] + for variable in variables: + for category in categories: + url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES, CLASSIFICACAO, category) + task = fetch(session, url) + tasks.append(asyncio.ensure_future(task)) + responses = [] + for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)): + try: + response = await future + responses.append(response) + except asyncio.TimeoutError: + print(f"Request timed out for {url}") + with open(f'../json/{year}.json', 'a') as f: + json.dump(responses, f) + +if __name__ == "__main__": + asyncio.run(main(ANOS_RESTANTES, VARIAVEIS, CATEGORIAS)) diff --git a/models/br_ibge_ppm/producao_origem_animal/code/json_to_parquet.py b/models/br_ibge_ppm/producao_origem_animal/code/json_to_parquet.py new file mode 100644 index 00000000..cec28234 --- /dev/null +++ b/models/br_ibge_ppm/producao_origem_animal/code/json_to_parquet.py @@ -0,0 +1,139 @@ +import json +import numpy as np +import os +import re +from collections import OrderedDict +import pandas as pd +import pyarrow as pa +from pathlib import Path + +def parse_file(file_path): + dict_list = [] + + with open(file_path) as f: + json_list = json.load(f) + + for j in json_list: + temp_od = OrderedDict() + temp_variavel = j["data"][0]["variavel"] + temp_unidade = j["data"][0]["unidade"] + for r in j["data"][0]["resultados"]: + temp_caracteristica = list(r["classificacoes"][0]["categoria"].values())[0] + for s in r["series"]: + temp_ano = list(s["serie"].keys())[0] + temp_valor = list(s["serie"].values())[0] + temp_sigla_uf = s["localidade"]["nome"].split("-")[-1].strip() + temp_id_municipio = s["localidade"]["id"] + + temp_od["ano"] = temp_ano + temp_od["sigla_uf"] = temp_sigla_uf + temp_od["id_municipio"] = temp_id_municipio + temp_od["produto"] = temp_caracteristica + if temp_unidade not in [ + "Mil Cruzeiros", "Mil Cruzados", "Mil Cruzados Novos", + "Mil Cruzeiros Reais", "Mil Reais" + ]: + temp_od["unidade"] = temp_unidade + temp_od[temp_variavel] = temp_valor + + dict_list.append(dict(temp_od)) + temp_od.clear() + return dict_list + +def join_dicts_by_keys(dict_list, keys): + result = {} + for d in dict_list: + key_tuple = tuple(d[k] for k in keys) + if key_tuple not in result: + result[key_tuple] = d + else: + result[key_tuple].update(d) + return list(result.values()) + +def create_raw_df(dict_list): + return pd.DataFrame(dict_list, dtype=str) + +def rename_columns(dataframe): + renamed_dataframe = dataframe.rename(columns={ + "Produção de origem animal": "quantidade", + "Valor da produção": "valor" + }) + return renamed_dataframe + +def treat_columns(dataframe): + dataframe = dataframe[["ano", "sigla_uf", "id_municipio", "produto", + "unidade", "quantidade", "valor"]] + COLUNAS_PARA_TRATAR = ["ano", "quantidade", "valor"] + + for coluna in COLUNAS_PARA_TRATAR: + dataframe[coluna] = dataframe[coluna].apply(lambda x: np.nan if x in ("-", "..", "...", "X") else x) + dataframe[coluna] = dataframe[coluna].astype("Int64") + return dataframe + +def currency_fix(row): + """ + Valor da produção (Mil Cruzeiros [1974 a 1985, 1990 a 1992], Mil Cruzados [1986 a 1988], + Mil Cruzados Novos [1989], Mil Cruzeiros Reais [1993], Mil Reais [1994 a 2022]) + Verificado em http://www.igf.com.br/calculadoras/conversor/conversor.htm + """ + + if 1974 <= row["ano"] <= 1985: + return row["valor"] / (1000**4 * 2.75) + elif 1986 <= row["ano"] <= 1988: + return row["valor"] / (1000**3 * 2.75) + elif row["ano"] == 1989: + return row["valor"] / (1000**2 * 2.75) + elif 1990 <= row["ano"] <= 1992: + return row["valor"] / (1000**2 * 2.75) + elif row["ano"] == 1993: + return row["valor"] / (1000 * 2.75) + else: + return row["valor"] + +def get_existing_years(directory): + root_dir = Path(directory) + year_dirs = root_dir.glob('ano=*') + years = set() + for year_dir in year_dirs: + match = re.match(r'ano=(\d+)', year_dir.name) + if match: + year = int(match.group(1)) + years.add(year) + + return sorted(list(years)) + +if __name__ == '__main__': + ANOS_TRANSFORMADOS = get_existing_years("../parquet") + ARQUIVOS_JSON = list(Path('../json/').glob('*.json')) + JSON_FALTANTES = [arquivo for arquivo in ARQUIVOS_JSON if int(arquivo.stem) not in ANOS_TRANSFORMADOS] + + for path in JSON_FALTANTES: + print(f"Criando dict_list com base no arquivo: {path}...") + dict_list = parse_file(path) + print("Concatendo as colunas do dict_list...") + dict_list = join_dicts_by_keys(dict_list, ["ano", "sigla_uf", "id_municipio", "produto"]) + print("Criando DataFrame a partir do dict_list...") + df = create_raw_df(dict_list) + print("Renomeando as colunas do DataFrame...") + df = rename_columns(df) + print("Tratando as colunas do DataFrame...") + df = treat_columns(df) + print("Aplicando a correção nominal retroativa da moeda... Impacto: valor") + df["valor"] = df.apply(currency_fix, axis=1) + print("Transformações finalizadas!") + temp_ano = df["ano"].max() + print("Deletando a coluna ano para possibilitar o particionamento...") + df.drop(columns=["ano"], inplace=True) + print("Transformações finalizadas!") + temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet" + print(f"Exportando o DataFrame particionado em {temp_export_file_path}...") + os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True) + temp_schema = pa.schema([("sigla_uf", pa.string()), + ("id_municipio", pa.string()), + ("produto", pa.string()), + ("unidade", pa.string()), + ("quantidade", pa.int64()), + ("valor", pa.int64())]) + df.to_parquet(temp_export_file_path, schema=temp_schema, index=False) + del(df) # Liberando memória + print() \ No newline at end of file diff --git a/models/br_ibge_ppm/producao_pecuaria/code/join_parquet.py b/models/br_ibge_ppm/producao_pecuaria/code/join_parquet.py new file mode 100644 index 00000000..e3100c0b --- /dev/null +++ b/models/br_ibge_ppm/producao_pecuaria/code/join_parquet.py @@ -0,0 +1,22 @@ +import pandas as pd +import pyarrow as pa +import os + +ANOS = range(1974, 2022 + 1) + +for temp_ano in ANOS: + df_ovinos = pd.read_parquet(f"../source/ovinos_tosquiados/parquet/ano={temp_ano}/data.parquet") + df_vacas = pd.read_parquet(f"../source/vacas_ordenhadas/parquet/ano={temp_ano}/data.parquet") + + print(f"Criando o DataFrame com os dados consolidados, referente ao ano de {temp_ano}...") + df_join = pd.merge(df_ovinos, df_vacas, on=["sigla_uf", "id_municipio"]) + + temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet" + print(f"Exportando o DataFrame particionado em {temp_export_file_path}...") + os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True) + temp_schema = pa.schema([("sigla_uf", pa.string()), + ("id_municipio", pa.string()), + ("ovinos_tosquiados", pa.int64()), + ("vacas_ordenhadas", pa.int64())]) + df_join.to_parquet(temp_export_file_path, schema=temp_schema, index=False) + print() \ No newline at end of file diff --git a/models/br_ibge_ppm/producao_pecuaria/source/ovinos_tosquiados/code/api_to_json.py b/models/br_ibge_ppm/producao_pecuaria/source/ovinos_tosquiados/code/api_to_json.py new file mode 100644 index 00000000..f0532f7f --- /dev/null +++ b/models/br_ibge_ppm/producao_pecuaria/source/ovinos_tosquiados/code/api_to_json.py @@ -0,0 +1,64 @@ +import asyncio +import aiohttp +import json +import glob +from tqdm.asyncio import tqdm +from aiohttp import ClientTimeout, TCPConnector +from tqdm import tqdm +from typing import Any, Dict, List + +API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]" +AGREGADO = "95" # É a tabela no SIDRA +PERIODOS = range(1974, 2022 + 1) +VARIAVEIS = ["108"] # As variáveis da tabela +NIVEL_GEOGRAFICO = "N6" # N6 = Municipal +LOCALIDADES = "all" +ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")] +ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS] + +async def fetch(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]: + """ + Faz uma requisição GET à API e retorna a resposta em formato JSON, incluindo a URL usada. + + Parâmetros: + - session (aiohttp.ClientSession): A sessão do cliente aiohttp. + - url (str): A URL da API para a qual a requisição será feita. + + Retorna: + - Dict[str, Any]: A resposta da API em formato JSON, incluindo a URL usada. + """ + async with session.get(url) as response: + data = await response.json() + return {'url': url, 'data': data} + +async def main(years: List[int], variables: List[str]) -> None: + """ + Faz requisições para a API para cada ano e variável, salvando as respostas em arquivos JSON. + + Parâmetros: + - years (List[int]): Lista de anos para os quais os dados serão consultados. + - variables (List[str]): Lista de variáveis da tabela a serem consultadas. + + Retorna: + - None + """ + for year in years: + print(f'Consultando dados do ano: {year}') + async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session: + tasks = [] + for variable in variables: + url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES) + task = fetch(session, url) + tasks.append(asyncio.ensure_future(task)) + responses = [] + for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)): + try: + response = await future + responses.append(response) + except asyncio.TimeoutError: + print(f"Request timed out for {url}") + with open(f'../json/{year}.json', 'a') as f: + json.dump(responses, f) + +if __name__ == "__main__": + asyncio.run(main(ANOS_RESTANTES, VARIAVEIS)) diff --git a/models/br_ibge_ppm/producao_pecuaria/source/ovinos_tosquiados/code/json_to_parquet.py b/models/br_ibge_ppm/producao_pecuaria/source/ovinos_tosquiados/code/json_to_parquet.py new file mode 100644 index 00000000..8a70b4a6 --- /dev/null +++ b/models/br_ibge_ppm/producao_pecuaria/source/ovinos_tosquiados/code/json_to_parquet.py @@ -0,0 +1,92 @@ +import json +import numpy as np +import os +import re +from collections import OrderedDict +import pandas as pd +import pyarrow as pa +from pathlib import Path + +def parse_file(file_path): + dict_list = [] + + with open(file_path) as f: + json_list = json.load(f) + + for j in json_list: + temp_od = OrderedDict() + temp_variavel = j["data"][0]["variavel"] + for r in j["data"][0]["resultados"]: + for s in r["series"]: + temp_ano = list(s["serie"].keys())[0] + temp_valor = list(s["serie"].values())[0] + temp_sigla_uf = s["localidade"]["nome"].split("-")[-1].strip() + temp_id_municipio = s["localidade"]["id"] + + temp_od["ano"] = temp_ano + temp_od["sigla_uf"] = temp_sigla_uf + temp_od["id_municipio"] = temp_id_municipio + temp_od[temp_variavel] = temp_valor + + dict_list.append(dict(temp_od)) + temp_od.clear() + return dict_list + +def create_raw_df(dict_list): + return pd.DataFrame(dict_list, dtype=str) + +def rename_columns(dataframe): + renamed_dataframe = dataframe.rename(columns={ + "Ovinos tosquiados nos estabelecimentos agropecu\u00e1rios": "ovinos_tosquiados" + }) + return renamed_dataframe + +def treat_columns(dataframe): + dataframe = dataframe[["ano", "sigla_uf", "id_municipio", "ovinos_tosquiados"]] + COLUNAS_PARA_TRATAR = ["ovinos_tosquiados"] + + for coluna in COLUNAS_PARA_TRATAR: + dataframe[coluna] = dataframe[coluna].apply(lambda x: np.nan if x in ("-", "..", "...", "X") else x) + dataframe[coluna] = dataframe[coluna].astype("Int64") + return dataframe + +def get_existing_years(directory): + root_dir = Path(directory) + year_dirs = root_dir.glob('ano=*') + years = set() + for year_dir in year_dirs: + match = re.match(r'ano=(\d+)', year_dir.name) + if match: + year = int(match.group(1)) + years.add(year) + + return sorted(list(years)) + +if __name__ == '__main__': + ANOS_TRANSFORMADOS = get_existing_years("../parquet") + ARQUIVOS_JSON = list(Path('../json/').glob('*.json')) + JSON_FALTANTES = [arquivo for arquivo in ARQUIVOS_JSON if int(arquivo.stem) not in ANOS_TRANSFORMADOS] + + for path in JSON_FALTANTES: + print(f"Criando dict_list com base no arquivo: {path}...") + dict_list = parse_file(path) + print("Criando DataFrame a partir do dict_list...") + df = create_raw_df(dict_list) + print("Renomeando as colunas do DataFrame...") + df = rename_columns(df) + print("Tratando as colunas do DataFrame...") + df = treat_columns(df) + print("Transformações finalizadas!") + temp_ano = df["ano"].max() + print("Deletando a coluna ano para possibilitar o particionamento...") + df.drop(columns=["ano"], inplace=True) + print("Transformações finalizadas!") + temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet" + print(f"Exportando o DataFrame particionado em {temp_export_file_path}...") + os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True) + temp_schema = pa.schema([("sigla_uf", pa.string()), + ("id_municipio", pa.string()), + ("ovinos_tosquiados", pa.int64())]) + df.to_parquet(temp_export_file_path, schema=temp_schema, index=False) + del(df) # Liberando memória + print() \ No newline at end of file diff --git a/models/br_ibge_ppm/producao_pecuaria/source/vacas_ordenhadas/code/api_to_json.py b/models/br_ibge_ppm/producao_pecuaria/source/vacas_ordenhadas/code/api_to_json.py new file mode 100644 index 00000000..b1f021f7 --- /dev/null +++ b/models/br_ibge_ppm/producao_pecuaria/source/vacas_ordenhadas/code/api_to_json.py @@ -0,0 +1,64 @@ +import asyncio +import aiohttp +import json +import glob +from tqdm.asyncio import tqdm +from aiohttp import ClientTimeout, TCPConnector +from tqdm import tqdm +from typing import Any, Dict, List + +API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]" +AGREGADO = "94" # É a tabela no SIDRA +PERIODOS = range(1974, 2022 + 1) +VARIAVEIS = ["107"] # As variáveis da tabela +NIVEL_GEOGRAFICO = "N6" # N6 = Municipal +LOCALIDADES = "all" +ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")] +ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS] + +async def fetch(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]: + """ + Faz uma requisição GET à API e retorna a resposta em formato JSON, incluindo a URL usada. + + Parâmetros: + - session (aiohttp.ClientSession): A sessão do cliente aiohttp. + - url (str): A URL da API para a qual a requisição será feita. + + Retorna: + - Dict[str, Any]: A resposta da API em formato JSON, incluindo a URL usada. + """ + async with session.get(url) as response: + data = await response.json() + return {'url': url, 'data': data} + +async def main(years: List[int], variables: List[str]) -> None: + """ + Faz requisições para a API para cada ano e variável, salvando as respostas em arquivos JSON. + + Parâmetros: + - years (List[int]): Lista de anos para os quais os dados serão consultados. + - variables (List[str]): Lista de variáveis da tabela a serem consultadas. + + Retorna: + - None + """ + for year in years: + print(f'Consultando dados do ano: {year}') + async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session: + tasks = [] + for variable in variables: + url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES) + task = fetch(session, url) + tasks.append(asyncio.ensure_future(task)) + responses = [] + for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)): + try: + response = await future + responses.append(response) + except asyncio.TimeoutError: + print(f"Request timed out for {url}") + with open(f'../json/{year}.json', 'a') as f: + json.dump(responses, f) + +if __name__ == "__main__": + asyncio.run(main(ANOS_RESTANTES, VARIAVEIS)) diff --git a/models/br_ibge_ppm/producao_pecuaria/source/vacas_ordenhadas/code/json_to_parquet.py b/models/br_ibge_ppm/producao_pecuaria/source/vacas_ordenhadas/code/json_to_parquet.py new file mode 100644 index 00000000..bf917ccd --- /dev/null +++ b/models/br_ibge_ppm/producao_pecuaria/source/vacas_ordenhadas/code/json_to_parquet.py @@ -0,0 +1,92 @@ +import json +import numpy as np +import os +import re +from collections import OrderedDict +import pandas as pd +import pyarrow as pa +from pathlib import Path + +def parse_file(file_path): + dict_list = [] + + with open(file_path) as f: + json_list = json.load(f) + + for j in json_list: + temp_od = OrderedDict() + temp_variavel = j["data"][0]["variavel"] + for r in j["data"][0]["resultados"]: + for s in r["series"]: + temp_ano = list(s["serie"].keys())[0] + temp_valor = list(s["serie"].values())[0] + temp_sigla_uf = s["localidade"]["nome"].split("-")[-1].strip() + temp_id_municipio = s["localidade"]["id"] + + temp_od["ano"] = temp_ano + temp_od["sigla_uf"] = temp_sigla_uf + temp_od["id_municipio"] = temp_id_municipio + temp_od[temp_variavel] = temp_valor + + dict_list.append(dict(temp_od)) + temp_od.clear() + return dict_list + +def create_raw_df(dict_list): + return pd.DataFrame(dict_list, dtype=str) + +def rename_columns(dataframe): + renamed_dataframe = dataframe.rename(columns={ + "Vacas ordenhadas": "vacas_ordenhadas" + }) + return renamed_dataframe + +def treat_columns(dataframe): + dataframe = dataframe[["ano", "sigla_uf", "id_municipio", "vacas_ordenhadas"]] + COLUNAS_PARA_TRATAR = ["vacas_ordenhadas"] + + for coluna in COLUNAS_PARA_TRATAR: + dataframe[coluna] = dataframe[coluna].apply(lambda x: np.nan if x in ("-", "..", "...", "X") else x) + dataframe[coluna] = dataframe[coluna].astype("Int64") + return dataframe + +def get_existing_years(directory): + root_dir = Path(directory) + year_dirs = root_dir.glob('ano=*') + years = set() + for year_dir in year_dirs: + match = re.match(r'ano=(\d+)', year_dir.name) + if match: + year = int(match.group(1)) + years.add(year) + + return sorted(list(years)) + +if __name__ == '__main__': + ANOS_TRANSFORMADOS = get_existing_years("../parquet") + ARQUIVOS_JSON = list(Path('../json/').glob('*.json')) + JSON_FALTANTES = [arquivo for arquivo in ARQUIVOS_JSON if int(arquivo.stem) not in ANOS_TRANSFORMADOS] + + for path in JSON_FALTANTES: + print(f"Criando dict_list com base no arquivo: {path}...") + dict_list = parse_file(path) + print("Criando DataFrame a partir do dict_list...") + df = create_raw_df(dict_list) + print("Renomeando as colunas do DataFrame...") + df = rename_columns(df) + print("Tratando as colunas do DataFrame...") + df = treat_columns(df) + print("Transformações finalizadas!") + temp_ano = df["ano"].max() + print("Deletando a coluna ano para possibilitar o particionamento...") + df.drop(columns=["ano"], inplace=True) + print("Transformações finalizadas!") + temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet" + print(f"Exportando o DataFrame particionado em {temp_export_file_path}...") + os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True) + temp_schema = pa.schema([("sigla_uf", pa.string()), + ("id_municipio", pa.string()), + ("vacas_ordenhadas", pa.int64())]) + df.to_parquet(temp_export_file_path, schema=temp_schema, index=False) + del(df) # Liberando memória + print() \ No newline at end of file diff --git a/models/br_ibge_ppm/schema.yml b/models/br_ibge_ppm/schema.yml new file mode 100644 index 00000000..61e87a26 --- /dev/null +++ b/models/br_ibge_ppm/schema.yml @@ -0,0 +1,132 @@ +--- +version: 2 +models: + - name: br_ibge_ppm__producao_pecuaria + description: Dados de pecuária contendo o número de vacas ordenhadas e ovinos + tosquiados. + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - [ano, sigla_uf, id_municipio, produto] + - not_null_proportion_multiple_columns: + at_least: 0.05 + columns: + - name: ano + description: Ano + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano + - name: sigla_uf + description: Sigla da Unidade da Federação + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__uf') + field: sigla + - name: id_municipio + description: ID Município IBGE 7 dígitos + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__municipio') + field: id_municipio + - name: ovinos_tosquiados + description: Número de ovinos tosquiados + - name: vacas_ordenhadas + description: Número de vacas ordenhadas + - name: br_ibge_ppm__efetivo_rebanhos + description: Efetivo dos rebanhos (cabeça), por tipo de rebanho. + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: [insert unique keys here] + - not_null_proportion_multiple_columns: + at_least: 0.05 + columns: + - name: ano + description: Ano + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano + - name: sigla_uf + description: Sigla da Unidade da Federação + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__uf') + field: sigla + - name: id_municipio + description: ID Município IBGE 7 dígitos + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__municipio') + field: id_municipio + - name: tipo_rebanho + description: Tipo de rebanho + - name: quantidade + description: Quantidade de animais + - name: br_ibge_ppm__producao_origem_animal + description: Produção de origem animal, por tipo de produto e valor da produção. + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: [insert unique keys here] + - not_null_proportion_multiple_columns: + at_least: 0.05 + columns: + - name: ano + description: Ano + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano + - name: sigla_uf + description: Sigla da Unidade da Federação + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__uf') + field: sigla + - name: id_municipio + description: ID Município IBGE 7 dígitos + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__municipio') + field: id_municipio + - name: produto + description: Produto + - name: unidade + description: Unidade de medida do produto + - name: quantidade + description: Quantidade da produção + - name: valor + description: Valor da produção + - name: br_ibge_ppm__producao_aquicultura + description: Produção da aquicultura, de acordo com a quantidade produzida em + quilogramas e valor da produção em mil reais. + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: [insert unique keys here] + - not_null_proportion_multiple_columns: + at_least: 0.05 + columns: + - name: ano + description: Ano + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano + - name: sigla_uf + description: Sigla da Unidade da Federação + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__uf') + field: sigla + - name: id_municipio + description: ID Município IBGE 7 dígitos + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__municipio') + field: id_municipio + - name: produto + description: Produto + - name: quantidade + description: Quantidade da produção + - name: valor + description: Valor da produção