Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pipelines] br_mg_belohorizonte_smfa_iptu #448

Merged
merged 39 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
2cde8aa
add pipeline iptu belohorizonte
tricktx Aug 31, 2023
99f8a33
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 31, 2023
0df7700
Merge branch 'master' into staging/br_mg_smfa
mergify[bot] Aug 31, 2023
ce1936c
name function
tricktx Aug 31, 2023
8f91ee3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 31, 2023
0f8ee5c
make_partitions
tricktx Aug 31, 2023
24f7517
remove upstream
tricktx Aug 31, 2023
c36d175
return tasks_pipeline
tricktx Aug 31, 2023
96f6298
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 31, 2023
cf18bc0
remove pd.dataframe make_partitions
tricktx Sep 1, 2023
0a0042d
add decorator
tricktx Sep 1, 2023
6c220cb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 1, 2023
1f672bf
add input get_max_data
tricktx Sep 1, 2023
99a96a3
rename constants
tricktx Sep 1, 2023
7c3360c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 1, 2023
d7505cd
constants_iptu
tricktx Sep 1, 2023
dfb1060
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 1, 2023
d925244
function changing coordinates
tricktx Sep 4, 2023
c354afe
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 4, 2023
9449c39
Merge branch 'master' into staging/br_mg_smfa
mergify[bot] Sep 4, 2023
d27a335
install geopandas and shapely in proetry lock
tricktx Sep 4, 2023
9c9b52e
Merge branch 'master' into staging/br_mg_smfa
mergify[bot] Sep 4, 2023
6b8a893
Merge branch 'master' into staging/br_mg_smfa
mergify[bot] Sep 5, 2023
7618fe9
refactoring code
tricktx Sep 5, 2023
b9eac26
remove logs
tricktx Sep 6, 2023
d99a674
test check_for_updates
tricktx Sep 6, 2023
72f27eb
test two check for updates
tricktx Sep 6, 2023
d89a8f4
check for updates
tricktx Sep 6, 2023
d4525c2
setando para prod
tricktx Sep 12, 2023
2b5d8d4
Merge branch 'master' into staging/br_mg_smfa
mergify[bot] Sep 12, 2023
0c30e3b
corrigindo erros do PR
tricktx Sep 12, 2023
0a6afd7
modificando a igualdade.
tricktx Sep 12, 2023
c6459d3
testar ultima vez na cloud
tricktx Sep 13, 2023
fbad029
voltando novamente para o check for updates
tricktx Sep 13, 2023
9172cc8
Alterando billing id
tricktx Sep 13, 2023
cc7dcbf
ajustando em prod
tricktx Sep 13, 2023
185cf8a
add upstream
tricktx Sep 13, 2023
421b482
check for updates
tricktx Sep 13, 2023
e23cef9
Merge branch 'master' into staging/br_mg_smfa
tricktx Sep 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pipelines/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@
from pipelines.datasets.br_mp_pep_cargos_funcoes.flows import *
from pipelines.datasets.br_ans_beneficiario.flows import *
from pipelines.datasets.br_anp_precos_combustiveis.flows import *
from pipelines.datasets.br_mg_belohorizonte_smfa_iptu.flows import *
from pipelines.datasets.br_rf_cafir.flows import *
Empty file.
85 changes: 85 additions & 0 deletions pipelines/datasets/br_mg_belohorizonte_smfa_iptu/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
"""
Constant values for the datasets projects
"""
from enum import Enum


class constants(Enum): # pylint: disable=c0103
RENAME = {
"NULOTCTM": "lote",
"INDICE_CADASTRAL": "indice_cadastral",
"ZONEAMENTO_PVIPTU": "zoneamento",
"ZONA_HOMOGENIA": "zona_homogenea",
"CEP": "cep",
"TIPO_LOGRADOURO": "tipo_logradouro",
"NOME_LOGRADOURO": "logradouro",
"NUMERO_IMOVEL": "numero_imovel",
"TIPO_CONSTRUTIVO": "tipo_construtivo",
"TIPO_OCUPACAO": "tipo_ocupacao",
"PADRAO_ACABAMENTO": "padrao_acabamento",
"TIPOLOGIA": "tipologia",
"QUANTIDADE_ECONOMIAS": "codigo_quantidade_economia",
"FREQUENCIA_COLETA": "frequencia_coleta",
"IND_REDE_TELEFONICA": "indicador_rede_telefonica",
"IND_MEIO_FIO": "indicador_meio_fio",
"IND_PAVIMENTACAO": "indicador_pavimentacao",
"IND_ARBORIZACAO": "indicador_arborizacao",
"IND_GALERIA_PLUVIAL": "indicador_galeria_pluvial",
"IND_ILUMINACAO_PUBLICA": "indicador_iluminacao_publica",
"IND_REDE_ESGOTO": "indicador_rede_esgoto",
"IND_REDE_AGUA": "indicador_agua",
"GEOMETRIA": "poligono",
"FRACAO_IDEAL": "fracao_ideal",
"AREA_TERRENO": "area_terreno",
"AREA_CONSTRUCAO": "area_construida",
}

ORDEM = [
"ano",
"mes",
"indice_cadastral",
"lote",
"zoneamento",
"zona_homogenea",
"cep",
"endereco",
"tipo_construtivo",
"tipo_ocupacao",
"padrao_acabamento",
"tipologia",
"codigo_quantidade_economia",
"frequencia_coleta",
"indicador_rede_telefonica",
"indicador_meio_fio",
"indicador_pavimentacao",
"indicador_arborizacao",
"indicador_galeria_pluvial",
"indicador_iluminacao_publica",
"indicador_rede_esgoto",
"indicador_agua",
"poligono",
"fracao_ideal",
"area_terreno",
"area_construida",
]

URLS = [
"https://dados.pbh.gov.br/dataset/cadastro-imobiliario-regional-venda-nova",
"https://dados.pbh.gov.br/dataset/cadastro-imobiliario-regional-pampulha",
"https://dados.pbh.gov.br/dataset/cadastro-imobiliario-regional-oeste",
"https://dados.pbh.gov.br/dataset/cadastro-imobiliario-regional-norte",
"https://dados.pbh.gov.br/dataset/cadastro-imobiliario-regional-noroeste",
"https://dados.pbh.gov.br/dataset/cadastro-imobiliario-regional-nordeste",
"https://dados.pbh.gov.br/dataset/cadastro-imobiliario-regional-leste",
"https://dados.pbh.gov.br/dataset/cadastro-imobiliario-regional-centro-sul",
"https://dados.pbh.gov.br/dataset/cadastro-imobiliario",
]

INPUT_PATH = "/tmp/input/"

OUTPUT_PATH = "/tmp/output/"

HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"
}
128 changes: 128 additions & 0 deletions pipelines/datasets/br_mg_belohorizonte_smfa_iptu/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
"""
Flows for br_mg_belohorizonte_smfa_iptu
"""
from datetime import timedelta
from prefect import Parameter, case
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from pipelines.utils.metadata.tasks import update_django_metadata
from pipelines.constants import constants
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
from pipelines.datasets.br_mg_belohorizonte_smfa_iptu.constants import (
constants as constants_iptu,
)
from pipelines.datasets.br_mg_belohorizonte_smfa_iptu.tasks import (
download_and_transform,
make_partitions,
get_max_data,
check_for_updates,
)

from pipelines.utils.tasks import (
create_table_and_upload_to_gcs,
rename_current_flow_run_dataset_table,
get_current_flow_labels,
log_task,
)

from pipelines.datasets.br_mg_belohorizonte_smfa_iptu.schedules import every_weeks_iptu

with Flow(
name="br_mg_belohorizonte_smfa_iptu.iptu", code_owners=["trick"]
) as br_mg_belohorizonte_smfa_iptu_iptu:
# Parameters
dataset_id = Parameter(
"dataset_id", default="br_mg_belohorizonte_smfa_iptu", required=True
)
table_id = Parameter("table_id", default="iptu", required=True)
materialization_mode = Parameter(
"materialization_mode", default="dev", required=False
)
materialize_after_dump = Parameter(
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)
update_metadata = Parameter("update_metadata", default=True, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)
dados_desatualizados = check_for_updates(dataset_id=dataset_id, table_id=table_id)
log_task(f"Checando se os dados estão desatualizados: {dados_desatualizados}")
with case(dados_desatualizados, False):
log_task(
"Dados atualizados, não é necessário fazer o download",
upstream_tasks=[dados_desatualizados],
)

with case(dados_desatualizados, True):
df = download_and_transform()
output_filepath = make_partitions(df, upstream_tasks=[df])
wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="append",
wait=output_filepath,
upstream_tasks=[output_filepath],
)

with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}",
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
upstream_tasks=[wait_upload_table],
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

data_max = get_max_data(
input=constants_iptu.INPUT_PATH.value,
upstream_tasks=[wait_for_materialization],
)

with case(update_metadata, True):
update_django_metadata(
tricktx marked this conversation as resolved.
Show resolved Hide resolved
dataset_id="br_mg_belohorizonte_smfa_iptu",
table_id="iptu",
metadata_type="DateTimeRange",
bq_last_update=False,
bq_table_last_year_month=False,
_last_date=data_max,
api_mode="prod",
date_format="yy-mm",
is_bd_pro=True,
is_free=False,
upstream_tasks=[data_max],
)

br_mg_belohorizonte_smfa_iptu_iptu.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_mg_belohorizonte_smfa_iptu_iptu.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
)
br_mg_belohorizonte_smfa_iptu_iptu.schedule = every_weeks_iptu
27 changes: 27 additions & 0 deletions pipelines/datasets/br_mg_belohorizonte_smfa_iptu/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
"""
Schedules for br_mg_belohorizonte_smfa_iptu
"""

from datetime import timedelta, datetime
from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock
from pipelines.constants import constants

every_weeks_iptu = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
labels=[constants.BASEDOSDADOS_PROD_AGENT_LABEL.value],
parameter_defaults={
"dataset_id": "br_mg_belohorizonte_smfa_iptu",
"table_id": "iptu",
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": True,
"update_metadata": "True",
},
),
]
)
108 changes: 108 additions & 0 deletions pipelines/datasets/br_mg_belohorizonte_smfa_iptu/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# -*- coding: utf-8 -*-
"""
Tasks for br_mg_belohorizonte_smfa_iptu
"""
from prefect import task
import requests
from bs4 import BeautifulSoup
from pipelines.datasets.br_mg_belohorizonte_smfa_iptu.constants import constants
from pipelines.utils.utils import extract_last_date, log, to_partitions
import os
from pipelines.datasets.br_mg_belohorizonte_smfa_iptu.utils import (
scrapping_download_csv,
concat_csv,
rename_columns,
fix_variables,
new_column_endereco,
new_columns_ano_mes,
reorder_and_fix_nan,
changing_coordinates,
)


@task # noqa
def download_and_transform():
log("Iniciando o web scrapping e download dos arquivos csv")
scrapping_download_csv(input_path=constants.INPUT_PATH.value)

log("Iniciando a concatenação dos arquivos csv")
df = concat_csv(input_path=constants.INPUT_PATH.value)

log("Iniciando a renomeação das colunas")
df = rename_columns(df=df)

log("Iniciando a substituição de variáveis")
df = fix_variables(df=df)

log("Iniciando a criação da coluna endereço")
df = new_column_endereco(df=df)

log("Iniciando a criação das colunas ano e mes")
df = new_columns_ano_mes(df=df)

log("Iniciando a mudança de coordenadas")
df = changing_coordinates(df=df)

log("Iniciando a reordenação das colunas")
df = reorder_and_fix_nan(df=df)

return df


@task
def make_partitions(df):
log("Iniciando a partição dos dados")

to_partitions(
data=df, partition_columns=["ano", "mes"], savepath=constants.OUTPUT_PATH.value
)

return constants.OUTPUT_PATH.value


def data_url(url, headers):
response = requests.get(url, headers=headers)

soup = BeautifulSoup(response.content, "html.parser")

links = soup.find_all("a", href=lambda href: href and href.endswith(".csv"))

if links:
link = links[-1]
filename = link.get("href").split("/")[-1][:6]
data_final = filename[0:4] + "-" + filename[4:6]
return data_final


@task
def check_for_updates(dataset_id, table_id):
"""
Checks if there are available updates for a specific dataset and table.

Returns:
bool: Returns True if updates are available, otherwise returns False.
"""
# Obtém a data mais recente do site
data_obj = data_url(constants.URLS.value[0], constants.HEADERS.value)

# Obtém a última data no site BD
data_bq_obj = extract_last_date(dataset_id, table_id, "yy-mm", "basedosdados")

# Registra a data mais recente do site
log(f"Última data no site do SMFA: {data_obj}")
log(f"Última data no site da BD: {data_bq_obj}")

# Compara as datas para verificar se há atualizações
if data_obj > data_bq_obj:
return True # Há atualizações disponíveis
else:
return False # Não há novas atualizações disponíveis


@task
def get_max_data(input):
arquivos = os.listdir(input)
valor = [valor[0:6] for valor in arquivos]
resultado = valor[0]
data = resultado[0:4] + "-" + resultado[4:6]
return data
Loading