From d636e0818bad0bcb996c9e80c300b2c11f226711 Mon Sep 17 00:00:00 2001 From: Jean Tozzi Date: Fri, 3 May 2024 10:52:40 -0300 Subject: [PATCH 1/3] add: br_ibge_ppm models and files --- dbt_project.yml | 3 + .../br_ibge_ppm__efetivo_rebanhos.sql | 21 +++ .../br_ibge_ppm__producao_aquicultura.sql | 21 +++ .../br_ibge_ppm__producao_origem_animal.sql | 22 +++ .../br_ibge_ppm__producao_pecuaria.sql | 20 +++ .../efetivo_rebanhos/code/api_to_json.py | 68 +++++++++ .../efetivo_rebanhos/code/json_to_parquet.py | 109 ++++++++++++++ .../producao_aquicultura/code/api_to_json.py | 70 +++++++++ .../code/json_to_parquet.py | 111 ++++++++++++++ .../code/api_to_json.py | 68 +++++++++ .../code/json_to_parquet.py | 139 ++++++++++++++++++ .../producao_pecuaria/code/join_parquet.py | 22 +++ .../ovinos_tosquiados/code/api_to_json.py | 64 ++++++++ .../ovinos_tosquiados/code/json_to_parquet.py | 92 ++++++++++++ .../vacas_ordenhadas/code/api_to_json.py | 64 ++++++++ .../vacas_ordenhadas/code/json_to_parquet.py | 92 ++++++++++++ models/br_ibge_ppm/schema.yml | 133 +++++++++++++++++ 17 files changed, 1119 insertions(+) create mode 100644 models/br_ibge_ppm/br_ibge_ppm__efetivo_rebanhos.sql create mode 100644 models/br_ibge_ppm/br_ibge_ppm__producao_aquicultura.sql create mode 100644 models/br_ibge_ppm/br_ibge_ppm__producao_origem_animal.sql create mode 100644 models/br_ibge_ppm/br_ibge_ppm__producao_pecuaria.sql create mode 100644 models/br_ibge_ppm/efetivo_rebanhos/code/api_to_json.py create mode 100644 models/br_ibge_ppm/efetivo_rebanhos/code/json_to_parquet.py create mode 100644 models/br_ibge_ppm/producao_aquicultura/code/api_to_json.py create mode 100644 models/br_ibge_ppm/producao_aquicultura/code/json_to_parquet.py create mode 100644 models/br_ibge_ppm/producao_origem_animal/code/api_to_json.py create mode 100644 models/br_ibge_ppm/producao_origem_animal/code/json_to_parquet.py create mode 100644 models/br_ibge_ppm/producao_pecuaria/code/join_parquet.py create mode 100644 models/br_ibge_ppm/producao_pecuaria/source/ovinos_tosquiados/code/api_to_json.py create mode 100644 models/br_ibge_ppm/producao_pecuaria/source/ovinos_tosquiados/code/json_to_parquet.py create mode 100644 models/br_ibge_ppm/producao_pecuaria/source/vacas_ordenhadas/code/api_to_json.py create mode 100644 models/br_ibge_ppm/producao_pecuaria/source/vacas_ordenhadas/code/json_to_parquet.py create mode 100644 models/br_ibge_ppm/schema.yml diff --git a/dbt_project.yml b/dbt_project.yml index a8f21f3c..ec7612e8 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -180,6 +180,9 @@ models: br_ibge_pnadc: +materialized: table +schema: br_ibge_pnadc + br_ibge_ppm: + +materialized: table + +schema: br_ibge_ppm br_inep_censo_educacao_superior: +materialized: table +schema: br_inep_censo_educacao_superior diff --git a/models/br_ibge_ppm/br_ibge_ppm__efetivo_rebanhos.sql b/models/br_ibge_ppm/br_ibge_ppm__efetivo_rebanhos.sql new file mode 100644 index 00000000..b4e07171 --- /dev/null +++ b/models/br_ibge_ppm/br_ibge_ppm__efetivo_rebanhos.sql @@ -0,0 +1,21 @@ +{{ + config( + alias="efetivo_rebanhos", + schema="br_ibge_ppm", + partition_by={ + "field": "ano", + "data_type": "int64", + "range": {"start": 1974, "end": 2022, "interval": 1}, + }, + cluster_by=["sigla_uf", "id_municipio"], + ) +}} +select + safe_cast(ano as int64) ano, + safe_cast(sigla_uf as string) sigla_uf, + safe_cast(id_municipio as string) id_municipio, + safe_cast(tipo_rebanho as string) tipo_rebanho, + safe_cast(quantidade as int64) quantidade +from `basedosdados-staging.br_ibge_ppm_staging.efetivo_rebanhos` as t +where quantidade is not null + diff --git a/models/br_ibge_ppm/br_ibge_ppm__producao_aquicultura.sql b/models/br_ibge_ppm/br_ibge_ppm__producao_aquicultura.sql new file mode 100644 index 00000000..f9aa3698 --- /dev/null +++ b/models/br_ibge_ppm/br_ibge_ppm__producao_aquicultura.sql @@ -0,0 +1,21 @@ +{{ + config( + alias="producao_aquicultura", + schema="br_ibge_ppm", + partition_by={ + "field": "ano", + "data_type": "int64", + "range": {"start": 2013, "end": 2022, "interval": 1}, + }, + cluster_by=["sigla_uf", "id_municipio"], + ) +}} +select + safe_cast(ano as int64) ano, + safe_cast(sigla_uf as string) sigla_uf, + safe_cast(id_municipio as string) id_municipio, + safe_cast(produto as string) produto, + safe_cast(quantidade as int64) quantidade, + safe_cast(valor as int64) valor, +from `basedosdados-staging.br_ibge_ppm_staging.producao_aquicultura` as t +where quantidade is not null diff --git a/models/br_ibge_ppm/br_ibge_ppm__producao_origem_animal.sql b/models/br_ibge_ppm/br_ibge_ppm__producao_origem_animal.sql new file mode 100644 index 00000000..446230c7 --- /dev/null +++ b/models/br_ibge_ppm/br_ibge_ppm__producao_origem_animal.sql @@ -0,0 +1,22 @@ +{{ + config( + alias="producao_origem_animal", + schema="br_ibge_ppm", + partition_by={ + "field": "ano", + "data_type": "int64", + "range": {"start": 1974, "end": 2022, "interval": 1}, + }, + cluster_by=["sigla_uf", "id_municipio"], + ) +}} +select + safe_cast(ano as int64) ano, + safe_cast(sigla_uf as string) sigla_uf, + safe_cast(id_municipio as string) id_municipio, + safe_cast(produto as string) produto, + safe_cast(unidade as string) unidade, + safe_cast(quantidade as int64) quantidade, + safe_cast(valor as int64) valor, +from `basedosdados-staging.br_ibge_ppm_staging.producao_origem_animal` as t +where quantidade is not null diff --git a/models/br_ibge_ppm/br_ibge_ppm__producao_pecuaria.sql b/models/br_ibge_ppm/br_ibge_ppm__producao_pecuaria.sql new file mode 100644 index 00000000..c71a328b --- /dev/null +++ b/models/br_ibge_ppm/br_ibge_ppm__producao_pecuaria.sql @@ -0,0 +1,20 @@ +{{ + config( + alias="producao_pecuaria", + schema="br_ibge_ppm", + partition_by={ + "field": "ano", + "data_type": "int64", + "range": {"start": 1974, "end": 2022, "interval": 1}, + }, + cluster_by=["sigla_uf", "id_municipio"], + ) +}} +select + safe_cast(ano as int64) ano, + safe_cast(sigla_uf as string) sigla_uf, + safe_cast(id_municipio as string) id_municipio, + safe_cast(ovinos_tosquiados as int64) ovinos_tosquiados, + safe_cast(vacas_ordenhadas as int64) vacas_ordenhadas, +from `basedosdados-staging.br_ibge_ppm_staging.producao_pecuaria` as t +where ovinos_tosquiados is not null or vacas_ordenhadas is not null diff --git a/models/br_ibge_ppm/efetivo_rebanhos/code/api_to_json.py b/models/br_ibge_ppm/efetivo_rebanhos/code/api_to_json.py new file mode 100644 index 00000000..982c3be0 --- /dev/null +++ b/models/br_ibge_ppm/efetivo_rebanhos/code/api_to_json.py @@ -0,0 +1,68 @@ +import asyncio +import aiohttp +import json +import glob +from tqdm.asyncio import tqdm +from aiohttp import ClientTimeout, TCPConnector +from tqdm import tqdm +from typing import Any, Dict, List + +API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]&classificacao={}[{}]" +AGREGADO = "3939" # É a tabela no SIDRA +PERIODOS = range(1974, 2022 + 1) +VARIAVEIS = ["105"] # As variáveis da tabela +NIVEL_GEOGRAFICO = "N6" # N6 = Municipal +LOCALIDADES = "all" +CLASSIFICACAO = "79" # Código pré-definido por agregado +CATEGORIAS = ["2670", "2675", "2672", "32794", "32795", "2681", "2677" + "32796", "32793", "2680"] # Produtos +ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")] +ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS] + +async def fetch(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]: + """ + Faz uma requisição GET à API e retorna a resposta em formato JSON. + + Parâmetros: + - session (aiohttp.ClientSession): A sessão do cliente aiohttp. + - url (str): A URL da API para a qual a requisição será feita. + + Retorna: + - Dict[str, Any]: A resposta da API em formato JSON. + """ + async with session.get(url) as response: + return await response.json() + +async def main(years: List[int], variables: List[str], categories: List[str]) -> None: + """ + Faz requisições para a API para cada ano, variável e categoria, salvando as respostas em arquivos JSON. + + Parâmetros: + - years (List[int]): Lista de anos para os quais os dados serão consultados. + - variables (List[str]): Lista de variáveis da tabela a serem consultadas. + - categories (List[str]): Lista de categorias a serem consultadas. + + Retorna: + - None + """ + for year in years: + print(f'Consultando dados do ano: {year}') + async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session: + tasks = [] + for variable in variables: + for category in categories: + url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES, CLASSIFICACAO, category) + task = fetch(session, url) + tasks.append(asyncio.ensure_future(task)) + responses = [] + for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)): + try: + response = await future + responses.append(response) + except asyncio.TimeoutError: + print(f"Request timed out for {url}") + with open(f'../json/{year}.json', 'a') as f: + json.dump(responses, f) + +if __name__ == "__main__": + asyncio.run(main(ANOS_RESTANTES, VARIAVEIS, CATEGORIAS)) diff --git a/models/br_ibge_ppm/efetivo_rebanhos/code/json_to_parquet.py b/models/br_ibge_ppm/efetivo_rebanhos/code/json_to_parquet.py new file mode 100644 index 00000000..83abaf59 --- /dev/null +++ b/models/br_ibge_ppm/efetivo_rebanhos/code/json_to_parquet.py @@ -0,0 +1,109 @@ +import json +import numpy as np +import os +import re +from collections import OrderedDict +import pandas as pd +import pyarrow as pa +from pathlib import Path + +def parse_file(file_path): + dict_list = [] + + with open(file_path) as f: + json_list = json.load(f) + + for j in json_list: + temp_od = OrderedDict() + temp_variavel = j[0]["variavel"] + for r in j[0]["resultados"]: + temp_caracteristica = list(r["classificacoes"][0]["categoria"].values())[0] + for s in r["series"]: + temp_ano = list(s["serie"].keys())[0] + temp_valor = list(s["serie"].values())[0] + temp_sigla_uf = s["localidade"]["nome"].split("-")[-1].strip() + temp_id_municipio = s["localidade"]["id"] + + temp_od["ano"] = temp_ano + temp_od["sigla_uf"] = temp_sigla_uf + temp_od["id_municipio"] = temp_id_municipio + temp_od["tipo_rebanho"] = temp_caracteristica + temp_od[temp_variavel] = temp_valor + + dict_list.append(dict(temp_od)) + temp_od.clear() + return dict_list + +def join_dicts_by_keys(dict_list, keys): + result = {} + for d in dict_list: + key_tuple = tuple(d[k] for k in keys) + if key_tuple not in result: + result[key_tuple] = d + else: + result[key_tuple].update(d) + return list(result.values()) + +def create_raw_df(dict_list): + return pd.DataFrame(dict_list, dtype=str) + +def rename_columns(dataframe): + renamed_dataframe = dataframe.rename(columns={ + "Tipo de rebanho": "tipo_rebanho", + "Efetivo dos rebanhos": "quantidade" + }) + return renamed_dataframe + +def treat_columns(dataframe): + dataframe = dataframe[["ano", "sigla_uf", "id_municipio", "tipo_rebanho", + "quantidade"]] + COLUNAS_PARA_TRATAR = ["quantidade"] + + for coluna in COLUNAS_PARA_TRATAR: + dataframe[coluna] = dataframe[coluna].apply(lambda x: np.nan if x in ("-", "..", "...", "X") else x) + dataframe[coluna] = dataframe[coluna].astype("Int64") + return dataframe + +def get_existing_years(directory): + root_dir = Path(directory) + year_dirs = root_dir.glob('ano=*') + years = set() + for year_dir in year_dirs: + match = re.match(r'ano=(\d+)', year_dir.name) + if match: + year = int(match.group(1)) + years.add(year) + + return sorted(list(years)) + +if __name__ == '__main__': + ANOS_TRANSFORMADOS = get_existing_years("../parquet") + ARQUIVOS_JSON = list(Path('../json/').glob('*.json')) + JSON_FALTANTES = [arquivo for arquivo in ARQUIVOS_JSON if int(arquivo.stem) not in ANOS_TRANSFORMADOS] + + for path in JSON_FALTANTES: + print(f"Criando dict_list com base no arquivo: {path}...") + dict_list = parse_file(path) + print("Concatendo as colunas do dict_list...") + dict_list = join_dicts_by_keys(dict_list, ["ano", "sigla_uf", "id_municipio", "tipo_rebanho"]) + print("Criando DataFrame a partir do dict_list...") + df = create_raw_df(dict_list) + print("Renomeando as colunas do DataFrame...") + df = rename_columns(df) + print("Tratando as colunas do DataFrame...") + df = treat_columns(df) + print("Transformações finalizadas!") + temp_ano = df["ano"].max() + print("Deletando a coluna ano para possibilitar o particionamento...") + df.drop(columns=["ano"], inplace=True) + print("Transformações finalizadas!") + temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet" + print(f"Exportando o DataFrame particionado em {temp_export_file_path}...") + os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True) + temp_schema = pa.schema([("sigla_uf", pa.string()), + ("id_municipio", pa.string()), + ("tipo_rebanho", pa.string()), + ("quantidade", pa.int64())]) + df.to_parquet(temp_export_file_path, schema=temp_schema, index=False) + del(df) # Liberando memória + print() \ No newline at end of file diff --git a/models/br_ibge_ppm/producao_aquicultura/code/api_to_json.py b/models/br_ibge_ppm/producao_aquicultura/code/api_to_json.py new file mode 100644 index 00000000..3dfca90c --- /dev/null +++ b/models/br_ibge_ppm/producao_aquicultura/code/api_to_json.py @@ -0,0 +1,70 @@ +import asyncio +import aiohttp +import json +import glob +from tqdm.asyncio import tqdm +from aiohttp import ClientTimeout, TCPConnector +from typing import Any, Dict, List + +API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]&classificacao={}[{}]" +AGREGADO = "3940" # É a tabela no SIDRA +PERIODOS = range(2013, 2022 + 1) +VARIAVEIS = ["4146", "215"] # As variáveis da tabela +NIVEL_GEOGRAFICO = "N6" # N6 = Municipal +LOCALIDADES = "all" +CLASSIFICACAO = "654" # Código pré-definido por agregado +CATEGORIAS = ["32861", "32865", "32866", "32867", "32868", "32869", "32870", + "32871", "32872", "32873", "32874", "32875", "32876", "32877", + "32878", "32879", "32880", "32881", "32886", "32887", "32888", + "32889", "32890", "32891"] # Produtos +ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")] +ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS] + +async def fetch(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]: + """ + Faz uma requisição GET à API e retorna a resposta em formato JSON, incluindo a URL usada. + + Parâmetros: + - session (aiohttp.ClientSession): A sessão do cliente aiohttp. + - url (str): A URL da API para a qual a requisição será feita. + + Retorna: + - Dict[str, Any]: A resposta da API em formato JSON, incluindo a URL usada. + """ + async with session.get(url) as response: + data = await response.json() + return {'url': url, 'data': data} + +async def main(years: List[int], variables: List[str], categories: List[str]) -> None: + """ + Faz requisições para a API para cada ano, variável e categoria, salvando as respostas em arquivos JSON. + + Parâmetros: + - years (List[int]): Lista de anos para os quais os dados serão consultados. + - variables (List[str]): Lista de variáveis da tabela a serem consultadas. + - categories (List[str]): Lista de categorias a serem consultadas. + + Retorna: + - None + """ + for year in years: + print(f'Consultando dados do ano: {year}') + async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session: + tasks = [] + for variable in variables: + for category in categories: + url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES, CLASSIFICACAO, category) + task = fetch(session, url) + tasks.append(asyncio.ensure_future(task)) + responses = [] + for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)): + try: + response = await future + responses.append(response) + except asyncio.TimeoutError: + print(f"Request timed out for {url}") + with open(f'../json/{year}.json', 'a') as f: + json.dump(responses, f) + +if __name__ == "__main__": + asyncio.run(main(ANOS_RESTANTES, VARIAVEIS, CATEGORIAS)) diff --git a/models/br_ibge_ppm/producao_aquicultura/code/json_to_parquet.py b/models/br_ibge_ppm/producao_aquicultura/code/json_to_parquet.py new file mode 100644 index 00000000..9104cbd6 --- /dev/null +++ b/models/br_ibge_ppm/producao_aquicultura/code/json_to_parquet.py @@ -0,0 +1,111 @@ +import json +import numpy as np +import os +import re +from collections import OrderedDict +import pandas as pd +import pyarrow as pa +from pathlib import Path + +def parse_file(file_path): + dict_list = [] + + with open(file_path) as f: + json_list = json.load(f) + + for j in json_list: + temp_od = OrderedDict() + temp_variavel = j["data"][0]["variavel"] + for r in j["data"][0]["resultados"]: + temp_caracteristica = list(r["classificacoes"][0]["categoria"].values())[0] + for s in r["series"]: + temp_ano = list(s["serie"].keys())[0] + temp_valor = list(s["serie"].values())[0] + temp_sigla_uf = s["localidade"]["nome"].split("-")[-1].strip() + temp_id_municipio = s["localidade"]["id"] + + temp_od["ano"] = temp_ano + temp_od["sigla_uf"] = temp_sigla_uf + temp_od["id_municipio"] = temp_id_municipio + temp_od["produto"] = temp_caracteristica + temp_od[temp_variavel] = temp_valor + + dict_list.append(dict(temp_od)) + temp_od.clear() + return dict_list + +def join_dicts_by_keys(dict_list, keys): + result = {} + for d in dict_list: + key_tuple = tuple(d[k] for k in keys) + if key_tuple not in result: + result[key_tuple] = d + else: + result[key_tuple].update(d) + return list(result.values()) + +def create_raw_df(dict_list): + return pd.DataFrame(dict_list, dtype=str) + +def rename_columns(dataframe): + renamed_dataframe = dataframe.rename(columns={ + "Tipo de produto da aquicultura": "produto", + "Produção da aquicultura": "quantidade", + "Valor da produção": "valor" + }) + return renamed_dataframe + +def treat_columns(dataframe): + dataframe = dataframe[["ano", "sigla_uf", "id_municipio", "produto", + "quantidade", "valor"]] + COLUNAS_PARA_TRATAR = ["quantidade", "valor"] + + for coluna in COLUNAS_PARA_TRATAR: + dataframe[coluna] = dataframe[coluna].apply(lambda x: np.nan if x in ("-", "..", "...", "X") else x) + dataframe[coluna] = dataframe[coluna].astype("Int64") + return dataframe + +def get_existing_years(directory): + root_dir = Path(directory) + year_dirs = root_dir.glob('ano=*') + years = set() + for year_dir in year_dirs: + match = re.match(r'ano=(\d+)', year_dir.name) + if match: + year = int(match.group(1)) + years.add(year) + + return sorted(list(years)) + +if __name__ == '__main__': + ANOS_TRANSFORMADOS = get_existing_years("../parquet") + ARQUIVOS_JSON = list(Path('../json/').glob('*.json')) + JSON_FALTANTES = [arquivo for arquivo in ARQUIVOS_JSON if int(arquivo.stem) not in ANOS_TRANSFORMADOS] + + for path in JSON_FALTANTES: + print(f"Criando dict_list com base no arquivo: {path}...") + dict_list = parse_file(path) + print("Concatendo as colunas do dict_list...") + dict_list = join_dicts_by_keys(dict_list, ["ano", "sigla_uf", "id_municipio", "produto"]) + print("Criando DataFrame a partir do dict_list...") + df = create_raw_df(dict_list) + print("Renomeando as colunas do DataFrame...") + df = rename_columns(df) + print("Tratando as colunas do DataFrame...") + df = treat_columns(df) + print("Transformações finalizadas!") + temp_ano = df["ano"].max() + print("Deletando a coluna ano para possibilitar o particionamento...") + df.drop(columns=["ano"], inplace=True) + print("Transformações finalizadas!") + temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet" + print(f"Exportando o DataFrame particionado em {temp_export_file_path}...") + os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True) + temp_schema = pa.schema([("sigla_uf", pa.string()), + ("id_municipio", pa.string()), + ("produto", pa.string()), + ("quantidade", pa.int64()), + ("valor", pa.int64())]) + df.to_parquet(temp_export_file_path, schema=temp_schema, index=False) + del(df) # Liberando memória + print() \ No newline at end of file diff --git a/models/br_ibge_ppm/producao_origem_animal/code/api_to_json.py b/models/br_ibge_ppm/producao_origem_animal/code/api_to_json.py new file mode 100644 index 00000000..a1614e86 --- /dev/null +++ b/models/br_ibge_ppm/producao_origem_animal/code/api_to_json.py @@ -0,0 +1,68 @@ +import asyncio +import aiohttp +import json +import glob +from tqdm.asyncio import tqdm +from aiohttp import ClientTimeout, TCPConnector +from tqdm import tqdm +from typing import Any, Dict, List + +API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]&classificacao={}[{}]" +AGREGADO = "74" # É a tabela no SIDRA +PERIODOS = range(1974, 2022 + 1) +VARIAVEIS = ["106", "215"] # As variáveis da tabela +NIVEL_GEOGRAFICO = "N6" # N6 = Municipal +LOCALIDADES = "all" +CLASSIFICACAO = "80" # Código pré-definido por agregado +CATEGORIAS = ["2682", "2683", "2684", "2685", "2686", "2687"] # Produtos +ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")] +ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS] + +async def fetch(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]: + """ + Faz uma requisição GET à API e retorna a resposta em formato JSON, incluindo a URL usada. + + Parâmetros: + - session (aiohttp.ClientSession): A sessão do cliente aiohttp. + - url (str): A URL da API para a qual a requisição será feita. + + Retorna: + - Dict[str, Any]: A resposta da API em formato JSON, incluindo a URL usada. + """ + async with session.get(url) as response: + data = await response.json() + return {'url': url, 'data': data} + +async def main(years: List[int], variables: List[str], categories: List[str]) -> None: + """ + Faz requisições para a API para cada ano, variável e categoria, salvando as respostas em arquivos JSON. + + Parâmetros: + - years (List[int]): Lista de anos para os quais os dados serão consultados. + - variables (List[str]): Lista de variáveis da tabela a serem consultadas. + - categories (List[str]): Lista de categorias a serem consultadas. + + Retorna: + - None + """ + for year in years: + print(f'Consultando dados do ano: {year}') + async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session: + tasks = [] + for variable in variables: + for category in categories: + url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES, CLASSIFICACAO, category) + task = fetch(session, url) + tasks.append(asyncio.ensure_future(task)) + responses = [] + for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)): + try: + response = await future + responses.append(response) + except asyncio.TimeoutError: + print(f"Request timed out for {url}") + with open(f'../json/{year}.json', 'a') as f: + json.dump(responses, f) + +if __name__ == "__main__": + asyncio.run(main(ANOS_RESTANTES, VARIAVEIS, CATEGORIAS)) diff --git a/models/br_ibge_ppm/producao_origem_animal/code/json_to_parquet.py b/models/br_ibge_ppm/producao_origem_animal/code/json_to_parquet.py new file mode 100644 index 00000000..cec28234 --- /dev/null +++ b/models/br_ibge_ppm/producao_origem_animal/code/json_to_parquet.py @@ -0,0 +1,139 @@ +import json +import numpy as np +import os +import re +from collections import OrderedDict +import pandas as pd +import pyarrow as pa +from pathlib import Path + +def parse_file(file_path): + dict_list = [] + + with open(file_path) as f: + json_list = json.load(f) + + for j in json_list: + temp_od = OrderedDict() + temp_variavel = j["data"][0]["variavel"] + temp_unidade = j["data"][0]["unidade"] + for r in j["data"][0]["resultados"]: + temp_caracteristica = list(r["classificacoes"][0]["categoria"].values())[0] + for s in r["series"]: + temp_ano = list(s["serie"].keys())[0] + temp_valor = list(s["serie"].values())[0] + temp_sigla_uf = s["localidade"]["nome"].split("-")[-1].strip() + temp_id_municipio = s["localidade"]["id"] + + temp_od["ano"] = temp_ano + temp_od["sigla_uf"] = temp_sigla_uf + temp_od["id_municipio"] = temp_id_municipio + temp_od["produto"] = temp_caracteristica + if temp_unidade not in [ + "Mil Cruzeiros", "Mil Cruzados", "Mil Cruzados Novos", + "Mil Cruzeiros Reais", "Mil Reais" + ]: + temp_od["unidade"] = temp_unidade + temp_od[temp_variavel] = temp_valor + + dict_list.append(dict(temp_od)) + temp_od.clear() + return dict_list + +def join_dicts_by_keys(dict_list, keys): + result = {} + for d in dict_list: + key_tuple = tuple(d[k] for k in keys) + if key_tuple not in result: + result[key_tuple] = d + else: + result[key_tuple].update(d) + return list(result.values()) + +def create_raw_df(dict_list): + return pd.DataFrame(dict_list, dtype=str) + +def rename_columns(dataframe): + renamed_dataframe = dataframe.rename(columns={ + "Produção de origem animal": "quantidade", + "Valor da produção": "valor" + }) + return renamed_dataframe + +def treat_columns(dataframe): + dataframe = dataframe[["ano", "sigla_uf", "id_municipio", "produto", + "unidade", "quantidade", "valor"]] + COLUNAS_PARA_TRATAR = ["ano", "quantidade", "valor"] + + for coluna in COLUNAS_PARA_TRATAR: + dataframe[coluna] = dataframe[coluna].apply(lambda x: np.nan if x in ("-", "..", "...", "X") else x) + dataframe[coluna] = dataframe[coluna].astype("Int64") + return dataframe + +def currency_fix(row): + """ + Valor da produção (Mil Cruzeiros [1974 a 1985, 1990 a 1992], Mil Cruzados [1986 a 1988], + Mil Cruzados Novos [1989], Mil Cruzeiros Reais [1993], Mil Reais [1994 a 2022]) + Verificado em http://www.igf.com.br/calculadoras/conversor/conversor.htm + """ + + if 1974 <= row["ano"] <= 1985: + return row["valor"] / (1000**4 * 2.75) + elif 1986 <= row["ano"] <= 1988: + return row["valor"] / (1000**3 * 2.75) + elif row["ano"] == 1989: + return row["valor"] / (1000**2 * 2.75) + elif 1990 <= row["ano"] <= 1992: + return row["valor"] / (1000**2 * 2.75) + elif row["ano"] == 1993: + return row["valor"] / (1000 * 2.75) + else: + return row["valor"] + +def get_existing_years(directory): + root_dir = Path(directory) + year_dirs = root_dir.glob('ano=*') + years = set() + for year_dir in year_dirs: + match = re.match(r'ano=(\d+)', year_dir.name) + if match: + year = int(match.group(1)) + years.add(year) + + return sorted(list(years)) + +if __name__ == '__main__': + ANOS_TRANSFORMADOS = get_existing_years("../parquet") + ARQUIVOS_JSON = list(Path('../json/').glob('*.json')) + JSON_FALTANTES = [arquivo for arquivo in ARQUIVOS_JSON if int(arquivo.stem) not in ANOS_TRANSFORMADOS] + + for path in JSON_FALTANTES: + print(f"Criando dict_list com base no arquivo: {path}...") + dict_list = parse_file(path) + print("Concatendo as colunas do dict_list...") + dict_list = join_dicts_by_keys(dict_list, ["ano", "sigla_uf", "id_municipio", "produto"]) + print("Criando DataFrame a partir do dict_list...") + df = create_raw_df(dict_list) + print("Renomeando as colunas do DataFrame...") + df = rename_columns(df) + print("Tratando as colunas do DataFrame...") + df = treat_columns(df) + print("Aplicando a correção nominal retroativa da moeda... Impacto: valor") + df["valor"] = df.apply(currency_fix, axis=1) + print("Transformações finalizadas!") + temp_ano = df["ano"].max() + print("Deletando a coluna ano para possibilitar o particionamento...") + df.drop(columns=["ano"], inplace=True) + print("Transformações finalizadas!") + temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet" + print(f"Exportando o DataFrame particionado em {temp_export_file_path}...") + os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True) + temp_schema = pa.schema([("sigla_uf", pa.string()), + ("id_municipio", pa.string()), + ("produto", pa.string()), + ("unidade", pa.string()), + ("quantidade", pa.int64()), + ("valor", pa.int64())]) + df.to_parquet(temp_export_file_path, schema=temp_schema, index=False) + del(df) # Liberando memória + print() \ No newline at end of file diff --git a/models/br_ibge_ppm/producao_pecuaria/code/join_parquet.py b/models/br_ibge_ppm/producao_pecuaria/code/join_parquet.py new file mode 100644 index 00000000..e3100c0b --- /dev/null +++ b/models/br_ibge_ppm/producao_pecuaria/code/join_parquet.py @@ -0,0 +1,22 @@ +import pandas as pd +import pyarrow as pa +import os + +ANOS = range(1974, 2022 + 1) + +for temp_ano in ANOS: + df_ovinos = pd.read_parquet(f"../source/ovinos_tosquiados/parquet/ano={temp_ano}/data.parquet") + df_vacas = pd.read_parquet(f"../source/vacas_ordenhadas/parquet/ano={temp_ano}/data.parquet") + + print(f"Criando o DataFrame com os dados consolidados, referente ao ano de {temp_ano}...") + df_join = pd.merge(df_ovinos, df_vacas, on=["sigla_uf", "id_municipio"]) + + temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet" + print(f"Exportando o DataFrame particionado em {temp_export_file_path}...") + os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True) + temp_schema = pa.schema([("sigla_uf", pa.string()), + ("id_municipio", pa.string()), + ("ovinos_tosquiados", pa.int64()), + ("vacas_ordenhadas", pa.int64())]) + df_join.to_parquet(temp_export_file_path, schema=temp_schema, index=False) + print() \ No newline at end of file diff --git a/models/br_ibge_ppm/producao_pecuaria/source/ovinos_tosquiados/code/api_to_json.py b/models/br_ibge_ppm/producao_pecuaria/source/ovinos_tosquiados/code/api_to_json.py new file mode 100644 index 00000000..f0532f7f --- /dev/null +++ b/models/br_ibge_ppm/producao_pecuaria/source/ovinos_tosquiados/code/api_to_json.py @@ -0,0 +1,64 @@ +import asyncio +import aiohttp +import json +import glob +from tqdm.asyncio import tqdm +from aiohttp import ClientTimeout, TCPConnector +from tqdm import tqdm +from typing import Any, Dict, List + +API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]" +AGREGADO = "95" # É a tabela no SIDRA +PERIODOS = range(1974, 2022 + 1) +VARIAVEIS = ["108"] # As variáveis da tabela +NIVEL_GEOGRAFICO = "N6" # N6 = Municipal +LOCALIDADES = "all" +ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")] +ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS] + +async def fetch(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]: + """ + Faz uma requisição GET à API e retorna a resposta em formato JSON, incluindo a URL usada. + + Parâmetros: + - session (aiohttp.ClientSession): A sessão do cliente aiohttp. + - url (str): A URL da API para a qual a requisição será feita. + + Retorna: + - Dict[str, Any]: A resposta da API em formato JSON, incluindo a URL usada. + """ + async with session.get(url) as response: + data = await response.json() + return {'url': url, 'data': data} + +async def main(years: List[int], variables: List[str]) -> None: + """ + Faz requisições para a API para cada ano e variável, salvando as respostas em arquivos JSON. + + Parâmetros: + - years (List[int]): Lista de anos para os quais os dados serão consultados. + - variables (List[str]): Lista de variáveis da tabela a serem consultadas. + + Retorna: + - None + """ + for year in years: + print(f'Consultando dados do ano: {year}') + async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session: + tasks = [] + for variable in variables: + url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES) + task = fetch(session, url) + tasks.append(asyncio.ensure_future(task)) + responses = [] + for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)): + try: + response = await future + responses.append(response) + except asyncio.TimeoutError: + print(f"Request timed out for {url}") + with open(f'../json/{year}.json', 'a') as f: + json.dump(responses, f) + +if __name__ == "__main__": + asyncio.run(main(ANOS_RESTANTES, VARIAVEIS)) diff --git a/models/br_ibge_ppm/producao_pecuaria/source/ovinos_tosquiados/code/json_to_parquet.py b/models/br_ibge_ppm/producao_pecuaria/source/ovinos_tosquiados/code/json_to_parquet.py new file mode 100644 index 00000000..8a70b4a6 --- /dev/null +++ b/models/br_ibge_ppm/producao_pecuaria/source/ovinos_tosquiados/code/json_to_parquet.py @@ -0,0 +1,92 @@ +import json +import numpy as np +import os +import re +from collections import OrderedDict +import pandas as pd +import pyarrow as pa +from pathlib import Path + +def parse_file(file_path): + dict_list = [] + + with open(file_path) as f: + json_list = json.load(f) + + for j in json_list: + temp_od = OrderedDict() + temp_variavel = j["data"][0]["variavel"] + for r in j["data"][0]["resultados"]: + for s in r["series"]: + temp_ano = list(s["serie"].keys())[0] + temp_valor = list(s["serie"].values())[0] + temp_sigla_uf = s["localidade"]["nome"].split("-")[-1].strip() + temp_id_municipio = s["localidade"]["id"] + + temp_od["ano"] = temp_ano + temp_od["sigla_uf"] = temp_sigla_uf + temp_od["id_municipio"] = temp_id_municipio + temp_od[temp_variavel] = temp_valor + + dict_list.append(dict(temp_od)) + temp_od.clear() + return dict_list + +def create_raw_df(dict_list): + return pd.DataFrame(dict_list, dtype=str) + +def rename_columns(dataframe): + renamed_dataframe = dataframe.rename(columns={ + "Ovinos tosquiados nos estabelecimentos agropecu\u00e1rios": "ovinos_tosquiados" + }) + return renamed_dataframe + +def treat_columns(dataframe): + dataframe = dataframe[["ano", "sigla_uf", "id_municipio", "ovinos_tosquiados"]] + COLUNAS_PARA_TRATAR = ["ovinos_tosquiados"] + + for coluna in COLUNAS_PARA_TRATAR: + dataframe[coluna] = dataframe[coluna].apply(lambda x: np.nan if x in ("-", "..", "...", "X") else x) + dataframe[coluna] = dataframe[coluna].astype("Int64") + return dataframe + +def get_existing_years(directory): + root_dir = Path(directory) + year_dirs = root_dir.glob('ano=*') + years = set() + for year_dir in year_dirs: + match = re.match(r'ano=(\d+)', year_dir.name) + if match: + year = int(match.group(1)) + years.add(year) + + return sorted(list(years)) + +if __name__ == '__main__': + ANOS_TRANSFORMADOS = get_existing_years("../parquet") + ARQUIVOS_JSON = list(Path('../json/').glob('*.json')) + JSON_FALTANTES = [arquivo for arquivo in ARQUIVOS_JSON if int(arquivo.stem) not in ANOS_TRANSFORMADOS] + + for path in JSON_FALTANTES: + print(f"Criando dict_list com base no arquivo: {path}...") + dict_list = parse_file(path) + print("Criando DataFrame a partir do dict_list...") + df = create_raw_df(dict_list) + print("Renomeando as colunas do DataFrame...") + df = rename_columns(df) + print("Tratando as colunas do DataFrame...") + df = treat_columns(df) + print("Transformações finalizadas!") + temp_ano = df["ano"].max() + print("Deletando a coluna ano para possibilitar o particionamento...") + df.drop(columns=["ano"], inplace=True) + print("Transformações finalizadas!") + temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet" + print(f"Exportando o DataFrame particionado em {temp_export_file_path}...") + os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True) + temp_schema = pa.schema([("sigla_uf", pa.string()), + ("id_municipio", pa.string()), + ("ovinos_tosquiados", pa.int64())]) + df.to_parquet(temp_export_file_path, schema=temp_schema, index=False) + del(df) # Liberando memória + print() \ No newline at end of file diff --git a/models/br_ibge_ppm/producao_pecuaria/source/vacas_ordenhadas/code/api_to_json.py b/models/br_ibge_ppm/producao_pecuaria/source/vacas_ordenhadas/code/api_to_json.py new file mode 100644 index 00000000..b1f021f7 --- /dev/null +++ b/models/br_ibge_ppm/producao_pecuaria/source/vacas_ordenhadas/code/api_to_json.py @@ -0,0 +1,64 @@ +import asyncio +import aiohttp +import json +import glob +from tqdm.asyncio import tqdm +from aiohttp import ClientTimeout, TCPConnector +from tqdm import tqdm +from typing import Any, Dict, List + +API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]" +AGREGADO = "94" # É a tabela no SIDRA +PERIODOS = range(1974, 2022 + 1) +VARIAVEIS = ["107"] # As variáveis da tabela +NIVEL_GEOGRAFICO = "N6" # N6 = Municipal +LOCALIDADES = "all" +ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")] +ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS] + +async def fetch(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]: + """ + Faz uma requisição GET à API e retorna a resposta em formato JSON, incluindo a URL usada. + + Parâmetros: + - session (aiohttp.ClientSession): A sessão do cliente aiohttp. + - url (str): A URL da API para a qual a requisição será feita. + + Retorna: + - Dict[str, Any]: A resposta da API em formato JSON, incluindo a URL usada. + """ + async with session.get(url) as response: + data = await response.json() + return {'url': url, 'data': data} + +async def main(years: List[int], variables: List[str]) -> None: + """ + Faz requisições para a API para cada ano e variável, salvando as respostas em arquivos JSON. + + Parâmetros: + - years (List[int]): Lista de anos para os quais os dados serão consultados. + - variables (List[str]): Lista de variáveis da tabela a serem consultadas. + + Retorna: + - None + """ + for year in years: + print(f'Consultando dados do ano: {year}') + async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session: + tasks = [] + for variable in variables: + url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES) + task = fetch(session, url) + tasks.append(asyncio.ensure_future(task)) + responses = [] + for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)): + try: + response = await future + responses.append(response) + except asyncio.TimeoutError: + print(f"Request timed out for {url}") + with open(f'../json/{year}.json', 'a') as f: + json.dump(responses, f) + +if __name__ == "__main__": + asyncio.run(main(ANOS_RESTANTES, VARIAVEIS)) diff --git a/models/br_ibge_ppm/producao_pecuaria/source/vacas_ordenhadas/code/json_to_parquet.py b/models/br_ibge_ppm/producao_pecuaria/source/vacas_ordenhadas/code/json_to_parquet.py new file mode 100644 index 00000000..bf917ccd --- /dev/null +++ b/models/br_ibge_ppm/producao_pecuaria/source/vacas_ordenhadas/code/json_to_parquet.py @@ -0,0 +1,92 @@ +import json +import numpy as np +import os +import re +from collections import OrderedDict +import pandas as pd +import pyarrow as pa +from pathlib import Path + +def parse_file(file_path): + dict_list = [] + + with open(file_path) as f: + json_list = json.load(f) + + for j in json_list: + temp_od = OrderedDict() + temp_variavel = j["data"][0]["variavel"] + for r in j["data"][0]["resultados"]: + for s in r["series"]: + temp_ano = list(s["serie"].keys())[0] + temp_valor = list(s["serie"].values())[0] + temp_sigla_uf = s["localidade"]["nome"].split("-")[-1].strip() + temp_id_municipio = s["localidade"]["id"] + + temp_od["ano"] = temp_ano + temp_od["sigla_uf"] = temp_sigla_uf + temp_od["id_municipio"] = temp_id_municipio + temp_od[temp_variavel] = temp_valor + + dict_list.append(dict(temp_od)) + temp_od.clear() + return dict_list + +def create_raw_df(dict_list): + return pd.DataFrame(dict_list, dtype=str) + +def rename_columns(dataframe): + renamed_dataframe = dataframe.rename(columns={ + "Vacas ordenhadas": "vacas_ordenhadas" + }) + return renamed_dataframe + +def treat_columns(dataframe): + dataframe = dataframe[["ano", "sigla_uf", "id_municipio", "vacas_ordenhadas"]] + COLUNAS_PARA_TRATAR = ["vacas_ordenhadas"] + + for coluna in COLUNAS_PARA_TRATAR: + dataframe[coluna] = dataframe[coluna].apply(lambda x: np.nan if x in ("-", "..", "...", "X") else x) + dataframe[coluna] = dataframe[coluna].astype("Int64") + return dataframe + +def get_existing_years(directory): + root_dir = Path(directory) + year_dirs = root_dir.glob('ano=*') + years = set() + for year_dir in year_dirs: + match = re.match(r'ano=(\d+)', year_dir.name) + if match: + year = int(match.group(1)) + years.add(year) + + return sorted(list(years)) + +if __name__ == '__main__': + ANOS_TRANSFORMADOS = get_existing_years("../parquet") + ARQUIVOS_JSON = list(Path('../json/').glob('*.json')) + JSON_FALTANTES = [arquivo for arquivo in ARQUIVOS_JSON if int(arquivo.stem) not in ANOS_TRANSFORMADOS] + + for path in JSON_FALTANTES: + print(f"Criando dict_list com base no arquivo: {path}...") + dict_list = parse_file(path) + print("Criando DataFrame a partir do dict_list...") + df = create_raw_df(dict_list) + print("Renomeando as colunas do DataFrame...") + df = rename_columns(df) + print("Tratando as colunas do DataFrame...") + df = treat_columns(df) + print("Transformações finalizadas!") + temp_ano = df["ano"].max() + print("Deletando a coluna ano para possibilitar o particionamento...") + df.drop(columns=["ano"], inplace=True) + print("Transformações finalizadas!") + temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet" + print(f"Exportando o DataFrame particionado em {temp_export_file_path}...") + os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True) + temp_schema = pa.schema([("sigla_uf", pa.string()), + ("id_municipio", pa.string()), + ("vacas_ordenhadas", pa.int64())]) + df.to_parquet(temp_export_file_path, schema=temp_schema, index=False) + del(df) # Liberando memória + print() \ No newline at end of file diff --git a/models/br_ibge_ppm/schema.yml b/models/br_ibge_ppm/schema.yml new file mode 100644 index 00000000..4d419cb4 --- /dev/null +++ b/models/br_ibge_ppm/schema.yml @@ -0,0 +1,133 @@ +version: 2 + +models: + - name: br_ibge_ppm__producao_pecuaria + description: Dados de pecuária contendo o número de vacas ordenhadas e ovinos tosquiados. + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - [ano, sigla_uf, id_municipio, produto] + - not_null_proportion_multiple_columns: + at_least: 0.05 + columns: + - name: ano + description: Ano + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano + - name: sigla_uf + description: Sigla da Unidade da Federação + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__uf') + field: sigla + - name: id_municipio + description: ID Município IBGE 7 dígitos + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__municipio') + field: id_municipio + - name: ovinos_tosquiados + description: Número de ovinos tosquiados + - name: vacas_ordenhadas + description: Número de vacas ordenhadas + - name: br_ibge_ppm__efetivo_rebanhos + description: Efetivo dos rebanhos (cabeça), por tipo de rebanho. + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - insert unique keys here + - not_null_proportion_multiple_columns: + at_least: 0.05 + columns: + - name: ano + description: Ano + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano + - name: sigla_uf + description: Sigla da Unidade da Federação + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__uf') + field: sigla + - name: id_municipio + description: ID Município IBGE 7 dígitos + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__municipio') + field: id_municipio + - name: tipo_rebanho + description: Tipo de rebanho + - name: quantidade + description: Quantidade de animais + - name: br_ibge_ppm__producao_origem_animal + description: Produção de origem animal, por tipo de produto e valor da produção. + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - insert unique keys here + - not_null_proportion_multiple_columns: + at_least: 0.05 + columns: + - name: ano + description: Ano + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano + - name: sigla_uf + description: Sigla da Unidade da Federação + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__uf') + field: sigla + - name: id_municipio + description: ID Município IBGE 7 dígitos + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__municipio') + field: id_municipio + - name: produto + description: Produto + - name: unidade + description: Unidade de medida do produto + - name: quantidade + description: Quantidade da produção + - name: valor + description: Valor da produção + - name: br_ibge_ppm__producao_aquicultura + description: Produção da aquicultura, de acordo com a quantidade produzida em quilogramas e valor da produção em mil reais. + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - insert unique keys here + - not_null_proportion_multiple_columns: + at_least: 0.05 + columns: + - name: ano + description: Ano + tests: + - relationships: + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano + - name: sigla_uf + description: Sigla da Unidade da Federação + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__uf') + field: sigla + - name: id_municipio + description: ID Município IBGE 7 dígitos + tests: + - relationships: + to: ref('br_bd_diretorios_brasil__municipio') + field: id_municipio + - name: produto + description: Produto + - name: quantidade + description: Quantidade da produção + - name: valor + description: Valor da produção From 4dcdce519d28fb7c1406d9112feef107ff1e89ba Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Mon, 6 May 2024 16:24:42 -0300 Subject: [PATCH 2/3] fix: pass lint --- dbt_project.yml | 5 +-- models/br_ibge_ppm/schema.yml | 77 +++++++++++++++++------------------ models/br_ms_sih/schema.yml | 66 ++++++++++++++++-------------- 3 files changed, 74 insertions(+), 74 deletions(-) diff --git a/dbt_project.yml b/dbt_project.yml index ec7612e8..9e4e1f5d 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -63,9 +63,6 @@ models: br_b3_cotacoes: +materialized: table +schema: br_b3_cotacoes - +post-hook: - - REVOKE `roles/bigquery.dataViewer` ON TABLE {{ this }} FROM "specialGroup:allUsers" - - GRANT `roles/bigquery.dataViewer` ON TABLE {{ this }} TO "group:bd-pro@basedosdados.org" br_bcb_agencia: +materialized: table +schema: br_bcb_agencia @@ -254,7 +251,7 @@ models: +schema: br_ms_sia br_ms_sih: +materialized: table - +schema: br_ms_sih + +schema: br_ms_sih br_ms_sim: +materialized: table +schema: br_ms_sim diff --git a/models/br_ibge_ppm/schema.yml b/models/br_ibge_ppm/schema.yml index 4d419cb4..61e87a26 100644 --- a/models/br_ibge_ppm/schema.yml +++ b/models/br_ibge_ppm/schema.yml @@ -1,33 +1,34 @@ +--- version: 2 - models: - name: br_ibge_ppm__producao_pecuaria - description: Dados de pecuária contendo o número de vacas ordenhadas e ovinos tosquiados. + description: Dados de pecuária contendo o número de vacas ordenhadas e ovinos + tosquiados. tests: - dbt_utils.unique_combination_of_columns: - combination_of_columns: - - [ano, sigla_uf, id_municipio, produto] + combination_of_columns: + - [ano, sigla_uf, id_municipio, produto] - not_null_proportion_multiple_columns: - at_least: 0.05 + at_least: 0.05 columns: - name: ano description: Ano tests: - relationships: - to: ref('br_bd_diretorios_data_tempo__ano') - field: ano.ano + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano - name: sigla_uf description: Sigla da Unidade da Federação tests: - relationships: - to: ref('br_bd_diretorios_brasil__uf') - field: sigla + to: ref('br_bd_diretorios_brasil__uf') + field: sigla - name: id_municipio description: ID Município IBGE 7 dígitos tests: - relationships: - to: ref('br_bd_diretorios_brasil__municipio') - field: id_municipio + to: ref('br_bd_diretorios_brasil__municipio') + field: id_municipio - name: ovinos_tosquiados description: Número de ovinos tosquiados - name: vacas_ordenhadas @@ -36,29 +37,28 @@ models: description: Efetivo dos rebanhos (cabeça), por tipo de rebanho. tests: - dbt_utils.unique_combination_of_columns: - combination_of_columns: - - insert unique keys here + combination_of_columns: [insert unique keys here] - not_null_proportion_multiple_columns: - at_least: 0.05 + at_least: 0.05 columns: - name: ano description: Ano tests: - relationships: - to: ref('br_bd_diretorios_data_tempo__ano') - field: ano.ano + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano - name: sigla_uf description: Sigla da Unidade da Federação tests: - relationships: - to: ref('br_bd_diretorios_brasil__uf') - field: sigla + to: ref('br_bd_diretorios_brasil__uf') + field: sigla - name: id_municipio description: ID Município IBGE 7 dígitos tests: - relationships: - to: ref('br_bd_diretorios_brasil__municipio') - field: id_municipio + to: ref('br_bd_diretorios_brasil__municipio') + field: id_municipio - name: tipo_rebanho description: Tipo de rebanho - name: quantidade @@ -67,29 +67,28 @@ models: description: Produção de origem animal, por tipo de produto e valor da produção. tests: - dbt_utils.unique_combination_of_columns: - combination_of_columns: - - insert unique keys here + combination_of_columns: [insert unique keys here] - not_null_proportion_multiple_columns: - at_least: 0.05 + at_least: 0.05 columns: - name: ano description: Ano tests: - relationships: - to: ref('br_bd_diretorios_data_tempo__ano') - field: ano.ano + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano - name: sigla_uf description: Sigla da Unidade da Federação tests: - relationships: - to: ref('br_bd_diretorios_brasil__uf') - field: sigla + to: ref('br_bd_diretorios_brasil__uf') + field: sigla - name: id_municipio description: ID Município IBGE 7 dígitos tests: - relationships: - to: ref('br_bd_diretorios_brasil__municipio') - field: id_municipio + to: ref('br_bd_diretorios_brasil__municipio') + field: id_municipio - name: produto description: Produto - name: unidade @@ -99,32 +98,32 @@ models: - name: valor description: Valor da produção - name: br_ibge_ppm__producao_aquicultura - description: Produção da aquicultura, de acordo com a quantidade produzida em quilogramas e valor da produção em mil reais. + description: Produção da aquicultura, de acordo com a quantidade produzida em + quilogramas e valor da produção em mil reais. tests: - dbt_utils.unique_combination_of_columns: - combination_of_columns: - - insert unique keys here + combination_of_columns: [insert unique keys here] - not_null_proportion_multiple_columns: - at_least: 0.05 + at_least: 0.05 columns: - name: ano description: Ano tests: - relationships: - to: ref('br_bd_diretorios_data_tempo__ano') - field: ano.ano + to: ref('br_bd_diretorios_data_tempo__ano') + field: ano.ano - name: sigla_uf description: Sigla da Unidade da Federação tests: - relationships: - to: ref('br_bd_diretorios_brasil__uf') - field: sigla + to: ref('br_bd_diretorios_brasil__uf') + field: sigla - name: id_municipio description: ID Município IBGE 7 dígitos tests: - relationships: - to: ref('br_bd_diretorios_brasil__municipio') - field: id_municipio + to: ref('br_bd_diretorios_brasil__municipio') + field: id_municipio - name: produto description: Produto - name: quantidade diff --git a/models/br_ms_sih/schema.yml b/models/br_ms_sih/schema.yml index f1f35913..8501a6f2 100644 --- a/models/br_ms_sih/schema.yml +++ b/models/br_ms_sih/schema.yml @@ -1,5 +1,5 @@ +--- version: 2 - models: - name: br_ms_sih__servicos_profissionais description: Sistema de Informações Hospitalares do SUS (SIH/SUS) @@ -24,32 +24,32 @@ models: description: Sigla unidade da fedaração tests: - relationships: - to: ref('br_bd_diretorios_data_tempo__uf') - field: sigla - config: - where: __most_recent_year_month__ + to: ref('br_bd_diretorios_data_tempo__uf') + field: sigla + config: + where: __most_recent_year_month__ - name: id_municipio_estabelecimento_aih description: Município de localização do Estabelecimento Executante da AIH tests: - relationships: - to: ref('br_bd_diretorios_brasil__municipio') - field: id_municipio - config: - where: __most_recent_year_month__ + to: ref('br_bd_diretorios_brasil__municipio') + field: id_municipio + config: + where: __most_recent_year_month__ - name: id_municipio_paciente description: Município de residência do paciente tests: - relationships: - to: ref('br_bd_diretorios_brasil__municipio') - field: id_municipio_6 - config: - where: __most_recent_year_month__ + to: ref('br_bd_diretorios_brasil__municipio') + field: id_municipio_6 + config: + where: __most_recent_year_month__ - name: id_gestor description: Unidade de Federação + Código Município de Gestão ou UF0000 se - o Estabelecimento Executante está sob Gestão Estadual. + o Estabelecimento Executante está sob Gestão Estadual. - name: id_estabelecimento_cnes description: ID do estabelecimento de saúde executante da Autorização de Internação - Hospitalar (AIH) + Hospitalar (AIH) - name: id_aih description: ID Autorização de Internação Hospitalar (AIH) tests: @@ -66,34 +66,35 @@ models: description: Procedimento referente ao ato profissional - name: cbo_2002_profissional description: Identificador de Ocupação Brasileira do Profissional que realizou - o ato ou “00000” caso não tenha sido + o ato ou “00000” caso não tenha sido tests: - relationships: - to: ref('br_bd_diretorios_brasil__cbo_2002') - field: cbo_2002.cbo_2002 - config: - where: __most_recent_year_month__ + to: ref('br_bd_diretorios_brasil__cbo_2002') + field: cbo_2002.cbo_2002 + config: + where: __most_recent_year_month__ - name: quantidade_procedimentos description: Quantidade de procedimentos realizados - name: id_cid_principal description: Identificador principal da Classificação Internacional de Doenças - e Problemas Relacionados com a Saúde (CID) + e Problemas Relacionados com a Saúde (CID) - name: id_cid_principal_subcategoria - description: Subcategoria do identificador principal da Classificação Internacional de Doenças - e Problemas Relacionados com a Saúde (CID) + description: Subcategoria do identificador principal da Classificação Internacional + de Doenças e Problemas Relacionados com a Saúde (CID) tests: - custom_relationships: to: ref('br_bd_diretorios_brasil__cid_10') field: subcategoria - ignore_values: ["'R501','Q314','S571','N182','U109','M723','M725','N975','N184','R500','N183','Q356','B501','N185','U099'"] + ignore_values: + - "'R501','Q314','S571','N182','U109','M723','M725','N975','N184','R500','N183','Q356','B501','N185','U099'" config: where: __most_recent_year_month__ - name: id_cid_secundario description: Identificador secundário da Classificação Internacional de Doenças - e Problemas Relacionados com a Saúde (CID) + e Problemas Relacionados com a Saúde (CID) - name: id_cid_secundario_subcategoria - description: Subcategoria do identificador secundário da Classificação Internacional de Doenças - e Problemas Relacionados com a Saúde (CID) + description: Subcategoria do identificador secundário da Classificação Internacional + de Doenças e Problemas Relacionados com a Saúde (CID) tests: - relationships: to: ref('br_bd_diretorios_brasil__cid_10') @@ -107,20 +108,23 @@ models: - name: quantidade_pontos description: Quantidade de pontos - name: nota_fiscal - description: Nota fiscal do material empregado quando órtese/prótese, quando não, o campo representa a data do ato + description: Nota fiscal do material empregado quando órtese/prótese, quando + não, o campo representa a data do ato - name: valor_ato_profissional description: Valor do ato profissional - name: indicador_uf_hospital - description: Indica se a UF de residência do hospital é diferente da UF de localização do estabelecimento + description: Indica se a UF de residência do hospital é diferente da UF de + localização do estabelecimento - name: indicador_uf_paciente description: Indica se a UF de residência do paciente é diferente da UF de - localização do estabelecimento + localização do estabelecimento - name: indicador_id_aih description: Indica quais id_aih são únicos - name: tipo_financiamento_ato_profissional description: Tipo de financiamento do ato profissional - name: tipo_subtipo_financiamento_ato_profissional - description: Tipo de financiamento (04-FAEC) + Subtipo de financiamento relacionado ao tipo de financiamento (04-FAEC) do ato profissional + description: Tipo de financiamento (04-FAEC) + Subtipo de financiamento relacionado + ao tipo de financiamento (04-FAEC) do ato profissional - name: tipo_documento_pf description: Documento de pessoa jurídica - name: tipo_documento_pj From 76eeb367a3acc1225440d914e251a3df64732d49 Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Mon, 6 May 2024 18:28:02 -0300 Subject: [PATCH 3/3] fix: pass lint --- models/br_ibge_ppm/br_ibge_ppm__efetivo_rebanhos.sql | 3 +-- models/br_ibge_ppm/br_ibge_ppm__producao_aquicultura.sql | 2 +- models/br_ibge_ppm/br_ibge_ppm__producao_origem_animal.sql | 2 +- models/br_ibge_ppm/br_ibge_ppm__producao_pecuaria.sql | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/models/br_ibge_ppm/br_ibge_ppm__efetivo_rebanhos.sql b/models/br_ibge_ppm/br_ibge_ppm__efetivo_rebanhos.sql index b4e07171..d2261fbb 100644 --- a/models/br_ibge_ppm/br_ibge_ppm__efetivo_rebanhos.sql +++ b/models/br_ibge_ppm/br_ibge_ppm__efetivo_rebanhos.sql @@ -7,7 +7,7 @@ "data_type": "int64", "range": {"start": 1974, "end": 2022, "interval": 1}, }, - cluster_by=["sigla_uf", "id_municipio"], + cluster_by=["sigla_uf"], ) }} select @@ -18,4 +18,3 @@ select safe_cast(quantidade as int64) quantidade from `basedosdados-staging.br_ibge_ppm_staging.efetivo_rebanhos` as t where quantidade is not null - diff --git a/models/br_ibge_ppm/br_ibge_ppm__producao_aquicultura.sql b/models/br_ibge_ppm/br_ibge_ppm__producao_aquicultura.sql index f9aa3698..5f885c72 100644 --- a/models/br_ibge_ppm/br_ibge_ppm__producao_aquicultura.sql +++ b/models/br_ibge_ppm/br_ibge_ppm__producao_aquicultura.sql @@ -7,7 +7,7 @@ "data_type": "int64", "range": {"start": 2013, "end": 2022, "interval": 1}, }, - cluster_by=["sigla_uf", "id_municipio"], + cluster_by=["sigla_uf"], ) }} select diff --git a/models/br_ibge_ppm/br_ibge_ppm__producao_origem_animal.sql b/models/br_ibge_ppm/br_ibge_ppm__producao_origem_animal.sql index 446230c7..2b7e701d 100644 --- a/models/br_ibge_ppm/br_ibge_ppm__producao_origem_animal.sql +++ b/models/br_ibge_ppm/br_ibge_ppm__producao_origem_animal.sql @@ -7,7 +7,7 @@ "data_type": "int64", "range": {"start": 1974, "end": 2022, "interval": 1}, }, - cluster_by=["sigla_uf", "id_municipio"], + cluster_by=["sigla_uf"], ) }} select diff --git a/models/br_ibge_ppm/br_ibge_ppm__producao_pecuaria.sql b/models/br_ibge_ppm/br_ibge_ppm__producao_pecuaria.sql index c71a328b..50c50f89 100644 --- a/models/br_ibge_ppm/br_ibge_ppm__producao_pecuaria.sql +++ b/models/br_ibge_ppm/br_ibge_ppm__producao_pecuaria.sql @@ -7,7 +7,7 @@ "data_type": "int64", "range": {"start": 1974, "end": 2022, "interval": 1}, }, - cluster_by=["sigla_uf", "id_municipio"], + cluster_by=["sigla_uf"], ) }} select