-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #565 from basedosdados/br_ibge_cnefe
[dados] br_ibge_censo_2022
- Loading branch information
Showing
6 changed files
with
287 additions
and
76 deletions.
There are no files selected for viewing
49 changes: 49 additions & 0 deletions
49
models/br_ibge_censo_2022/br_ibge_censo_2022__cadastro_enderecos.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
{{ | ||
config( | ||
alias="cadastro_enderecos", | ||
schema="br_ibge_censo_2022", | ||
partition_by={ | ||
"field": "sigla_uf", | ||
"data_type": "string", | ||
}, | ||
cluster_by=["id_municipio"], | ||
) | ||
}} | ||
|
||
select | ||
safe_cast(sigla_uf as string) sigla_uf, | ||
safe_cast(cod_municipio as string) id_municipio, | ||
safe_cast(cod_distrito as string) id_distrito, | ||
safe_cast(cod_subdistrito as string) id_subdistrito, | ||
safe_cast(cod_setor as string) id_setor_censitario, | ||
safe_cast(cep as string) cep, | ||
safe_cast(cod_unico_endereco as string) id_endereco, | ||
safe_cast(num_quadra as string) numero_quadra, | ||
safe_cast(num_face as string) numero_face, | ||
safe_cast(dsc_localidade as string) localidade, | ||
safe_cast(nom_tipo_seglogr as string) tipo_segmento_logradouro, | ||
safe_cast(nom_titulo_seglogr as string) titulo_segmento_logradouro, | ||
safe_cast(nom_seglogr as string) nome_logradouro, | ||
safe_cast(num_endereco as string) numero_logradouro, | ||
safe_cast(dsc_modificador as string) modificador_numero, | ||
safe_cast(nom_comp_elem1 as string) complemento_elemento_1, | ||
safe_cast(val_comp_elem1 as string) complemento_valor_1, | ||
safe_cast(nom_comp_elem2 as string) complemento_elemento_2, | ||
safe_cast(val_comp_elem2 as string) complemento_valor_2, | ||
safe_cast(nom_comp_elem3 as string) complemento_elemento_3, | ||
safe_cast(val_comp_elem3 as string) complemento_valor_3, | ||
safe_cast(nom_comp_elem4 as string) complemento_elemento_4, | ||
safe_cast(val_comp_elem4 as string) complemento_valor_4, | ||
safe_cast(nom_comp_elem5 as string) complemento_elemento_5, | ||
safe_cast(val_comp_elem5 as string) complemento_valor_5, | ||
safe_cast(latitude as string) latitude, | ||
safe_cast(longitude as string) longitude, | ||
st_geogpoint(safe_cast(longitude as float64), safe_cast(latitude as float64)) ponto, | ||
safe_cast(nv_geo_coord as string) nivel_geocodificacao_coordenadas, | ||
safe_cast(dsc_estabelecimento as string) descricao_estabelecimento, | ||
safe_cast(cod_especie as string) tipo_especie, | ||
safe_cast(cod_indicador_estab_endereco as string) tipo_estabelecimento, | ||
safe_cast(cod_indicador_const_endereco as string) tipo_construcao, | ||
safe_cast(cod_indicador_finalidade_const as string) tipo_finalidade_construcao, | ||
safe_cast(cod_tipo_especi as string) tipo_edificacao_domicilio, | ||
from `basedosdados-dev.br_ibge_censo_2022_staging.cadastro_enderecos` as t |
22 changes: 0 additions & 22 deletions
22
models/br_ibge_censo_2022/br_ibge_censo_2022__coordenada_endereco.sql
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
from constants import constants | ||
|
||
import os | ||
import requests | ||
import zipfile | ||
import pandas as pd | ||
from io import BytesIO | ||
from tqdm import tqdm | ||
|
||
|
||
CNEFE_FILE_NAMES = constants.CNEFE_FILE_NAMES.value | ||
URL = constants.CNEFE_FTP_URL.value | ||
|
||
|
||
# Função para baixar cada arquivo do FTP | ||
def download_files_from_ftp(url: str) -> BytesIO: | ||
""" | ||
Baixa um arquivo do FTP e retorna seu conteúdo em memória. | ||
Parâmetros: | ||
url (str): URL do arquivo a ser baixado. | ||
Retorna: | ||
BytesIO: Conteúdo do arquivo baixado em memória. | ||
""" | ||
response = requests.get(url, stream=True) | ||
response.raise_for_status() | ||
total_size = int(response.headers.get('content-length', 0)) | ||
block_size = 1024 # 1 Kibibyte | ||
t = tqdm(total=total_size, unit='iB', unit_scale=True) | ||
file_data = BytesIO() | ||
for data in response.iter_content(block_size): | ||
t.update(len(data)) | ||
file_data.write(data) | ||
t.close() | ||
return file_data | ||
|
||
# Função para descompactar e processar o arquivo de São Paulo (SP) em chunks | ||
def process_sp_file(file: BytesIO, uf:str) -> None: | ||
""" | ||
Descompacta, processa e salva o arquivo de São Paulo (SP) em chunks. | ||
Parâmetros: | ||
file (BytesIO): Conteúdo do arquivo ZIP em memória. | ||
""" | ||
output_dir = f"/tmp/br_ibge_censo_2022/output/sigla_uf={uf}" | ||
os.makedirs(output_dir, exist_ok=True) | ||
chunk_number = 0 | ||
|
||
with zipfile.ZipFile(file) as z: | ||
with z.open(z.namelist()[0]) as f: | ||
for chunk in pd.read_csv(f, chunksize=1000000, sep=';', dtype=str): | ||
chunk_number += 1 | ||
chunk.to_parquet(os.path.join(output_dir, f"{uf}_chunk_{chunk_number}.parquet"), compression="gzip") | ||
print(f"Chunk {chunk_number} de {uf} salvo com sucesso.") | ||
|
||
# Função para descompactar o arquivo na sessão | ||
def unzip_file_in_session(file: BytesIO) -> pd.DataFrame: | ||
""" | ||
Descompacta um arquivo ZIP em memória e carrega seu conteúdo em um DataFrame do pandas. | ||
Parâmetros: | ||
file (BytesIO): Conteúdo do arquivo ZIP em memória. | ||
Retorna: | ||
pd.DataFrame: DataFrame com os dados do arquivo descompactado. | ||
""" | ||
with zipfile.ZipFile(file) as z: | ||
with z.open(z.namelist()[0]) as f: | ||
df = pd.read_csv(f,sep=';', dtype=str) | ||
return df | ||
|
||
# Função para salvar o DataFrame em formato parquet com compressão gzip | ||
def save_parquet(df: pd.DataFrame, mkdir: bool, table_id: str) -> None: | ||
""" | ||
Salva um DataFrame em formato parquet com compressão gzip em um diretório específico. | ||
Parâmetros: | ||
df (pd.DataFrame): DataFrame a ser salvo. | ||
mkdir (bool): Se True, cria o diretório se ele não existir. | ||
table_id (str): Identificador da tabela usado para nomear o arquivo e diretório. | ||
""" | ||
output_dir = f"/tmp/br_ibge_censo_2022/output/sigla_uf={table_id}" | ||
if mkdir: | ||
os.makedirs(output_dir, exist_ok=True) | ||
df.to_parquet(os.path.join(output_dir, f"{table_id}.parquet"), compression="gzip") | ||
|
||
# Baixar, descompactar e salvar os arquivos | ||
for uf, filename in CNEFE_FILE_NAMES.items(): | ||
url = f"{URL}/{filename}" | ||
print(f'----- Baixando o arquivo: {url}') | ||
|
||
try: | ||
zip_file = download_files_from_ftp(url) | ||
if uf in ['SP', 'RJ', 'MG', 'BA', 'RS']: | ||
process_sp_file(zip_file, uf) | ||
else: | ||
df = unzip_file_in_session(zip_file) | ||
save_parquet(df, mkdir=True, table_id=uf) | ||
print(f"Arquivo {filename} baixado e salvo com sucesso.") | ||
except Exception as e: | ||
print(f"Erro ao processar o arquivo {filename}: {e}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters