Skip to content

Commit

Permalink
Merge branch 'master' into staging/br_cgu_servidores_executivo_federal
Browse files Browse the repository at this point in the history
  • Loading branch information
aspeddro committed Sep 26, 2023
2 parents 6e666da + 107a367 commit 4276314
Show file tree
Hide file tree
Showing 16 changed files with 815 additions and 300 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ repos:
- id: no-commit-to-branch # prevents committing to protected branches
- id: trailing-whitespace # prevents trailing whitespace
- repo: https://github.com/psf/black
rev: 23.7.0
rev: 23.9.1
hooks:
- id: black

Expand Down
16 changes: 8 additions & 8 deletions pipelines/datasets/br_bd_indicadores/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
interval=timedelta(days=1),
start_date=datetime(2022, 5, 18, 16, 24),
labels=[
constants.BASEDOSDADOS_DEV_AGENT_LABEL.value,
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_bd_indicadores",
"table_id": "twitter_metrics",
"materialization_mode": "dev",
"materialization_mode": "prod",
"materialize after dump": True,
"dbt_alias": False,
},
Expand All @@ -56,11 +56,11 @@
interval=timedelta(weeks=1),
start_date=datetime(2021, 1, 1, 17, 35),
labels=[
constants.BASEDOSDADOS_DEV_AGENT_LABEL.value,
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_bd_indicadores",
"materialization_mode": "dev",
"materialization_mode": "prod",
"materialize after dump": True,
"table_id": "twitter_metrics_agg",
},
Expand Down Expand Up @@ -120,14 +120,14 @@
interval=timedelta(days=1),
start_date=datetime(2022, 1, 1, 6, 0),
labels=[
constants.BASEDOSDADOS_DEV_AGENT_LABEL.value,
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_bd_indicadores",
"table_id": "equipes",
"sheet_id": "1gLJyoxiFeIRn7FKiP3Fpbr04bScVuhmF",
"sheet_name": "equipes",
"materialization_mode": "dev",
"materialization_mode": "prod",
"materialize_after_dump": True,
},
),
Expand All @@ -142,14 +142,14 @@
interval=timedelta(days=1),
start_date=datetime(2022, 1, 1, 6, 0),
labels=[
constants.BASEDOSDADOS_DEV_AGENT_LABEL.value,
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_bd_indicadores",
"table_id": "pessoas",
"sheet_id": "1cQj9ItJoO_AQElRT2ngpHZXhFCSpQCrV",
"sheet_name": "pessoas",
"materialization_mode": "dev",
"materialization_mode": "prod",
"materialize_after_dump": True,
},
),
Expand Down
98 changes: 45 additions & 53 deletions pipelines/datasets/br_cvm_administradores_carteira/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
clean_table_responsavel,
clean_table_pessoa_fisica,
clean_table_pessoa_juridica,
extract_last_date,
)
from pipelines.utils.metadata.tasks import update_django_metadata

from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
from pipelines.utils.tasks import (
create_table_and_upload_to_gcs,
update_metadata,
get_temporal_coverage,
rename_current_flow_run_dataset_table,
get_current_flow_labels,
Expand All @@ -37,7 +39,7 @@
URL = "http://dados.cvm.gov.br/dados/ADM_CART/CAD/DADOS/cad_adm_cart.zip"

with Flow(
name="br_cvm_administradores_carteira.responsavel", code_owners=["lucas_cr"]
name="br_cvm_administradores_carteira.responsavel", code_owners=["Equipe Pipelines"]
) as br_cvm_adm_car_res:
# Parameters
dataset_id = Parameter(
Expand Down Expand Up @@ -68,17 +70,7 @@
dump_mode="append",
wait=filepath,
)

# no generate temporal coverage since there is no date variable
# wait_update_metadata = update_metadata(
# dataset_id=dataset_id,
# table_id=table_id,
# fields_to_update=[
# {"last_updated": {"data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}}
# ],
# upstream_tasks=[wait_upload_table],
# )

# dont generate temporal coverage since there's no date variable
with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
Expand Down Expand Up @@ -113,7 +105,7 @@
br_cvm_adm_car_res.schedule = schedule_responsavel

with Flow(
"br_cvm_administradores_carteira.pessoa_fisica", code_owners=["lucas_cr"]
"br_cvm_administradores_carteira.pessoa_fisica", code_owners=["Equipe Pipelines"]
) as br_cvm_adm_car_pes_fis:
# Parameters
dataset_id = Parameter(
Expand All @@ -129,6 +121,7 @@
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=False, required=False)
update_metadata = Parameter("update_metadata", default=False, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
Expand All @@ -145,25 +138,6 @@
wait=filepath,
)

# update_metadata
temporal_coverage = get_temporal_coverage(
filepath=filepath,
date_cols=["data_registro"],
time_unit="day",
interval="1",
upstream_tasks=[wait_upload_table],
)

# wait_update_metadata = update_metadata(
# dataset_id=dataset_id,
# table_id=table_id,
# fields_to_update=[
# {"last_updated": {"data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}},
# {"temporal_coverage": [temporal_coverage]},
# ],
# upstream_tasks=[temporal_coverage],
# )

with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
Expand Down Expand Up @@ -192,13 +166,31 @@
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)
with case(update_metadata, True):
data = extract_last_date(
dataset_id, table_id, "basedosdados", var_name="data_registro"
)
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
_last_date=data,
bq_last_update=False,
api_mode="prod",
date_format="yy-mm-dd",
is_bd_pro=True,
is_free=True,
time_delta=6,
time_unit="months",
upstream_tasks=[materialization_flow, data],
)

br_cvm_adm_car_pes_fis.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_cvm_adm_car_pes_fis.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_cvm_adm_car_pes_fis.schedule = schedule_fisica

with Flow(
"br_cvm_administradores_carteira.pessoa_juridica", code_owners=["lucas_cr"]
"br_cvm_administradores_carteira.pessoa_juridica", code_owners=["Equipe Pipelines"]
) as br_cvm_adm_car_pes_jur:
# Parameters
dataset_id = Parameter(
Expand All @@ -214,6 +206,7 @@
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=False, required=False)
update_metadata = Parameter("update_metadata", default=False, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
Expand All @@ -230,25 +223,6 @@
wait=filepath,
)

# update_metadata
temporal_coverage = get_temporal_coverage(
filepath=filepath,
date_cols=["data_registro"],
time_unit="day",
interval="1",
upstream_tasks=[wait_upload_table],
)

# wait_update_metadata = update_metadata(
# dataset_id=dataset_id,
# table_id=table_id,
# fields_to_update=[
# {"last_updated": {"data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}},
# {"temporal_coverage": [temporal_coverage]},
# ],
# upstream_tasks=[temporal_coverage],
# )

with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
Expand Down Expand Up @@ -278,6 +252,24 @@
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
data = extract_last_date(
dataset_id, table_id, "basedosdados", var_name="data_registro"
)
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
_last_date=data,
bq_last_update=False,
api_mode="prod",
date_format="yy-mm-dd",
is_bd_pro=True,
is_free=True,
time_delta=6,
time_unit="months",
upstream_tasks=[wait_for_materialization, data],
)
br_cvm_adm_car_pes_jur.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_cvm_adm_car_pes_jur.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_cvm_adm_car_pes_jur.schedule = schedule_juridica
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"materialization_mode": "prod",
"materialize_after_dump": True,
"table_id": "responsavel",
"dbt_alias": True,
},
)
],
Expand All @@ -41,7 +42,8 @@
"materialization_mode": "prod",
"materialize_after_dump": True,
"table_id": "pessoa_fisica",
"dbt_alias": False,
"udpate_metadata": True,
"dbt_alias": True,
},
)
],
Expand All @@ -61,7 +63,8 @@
"materialization_mode": "prod",
"materialize_after_dump": True,
"table_id": "pessoa_juridica",
"dbt_alias": False,
"udpate_metadata": True,
"dbt_alias": True,
},
)
],
Expand Down
44 changes: 44 additions & 0 deletions pipelines/datasets/br_cvm_administradores_carteira/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from pandas.api.types import is_string_dtype
from prefect import task
from unidecode import unidecode
import basedosdados as bd
from datetime import datetime
from pipelines.utils.utils import log


@task
Expand Down Expand Up @@ -150,3 +153,44 @@ def clean_table_pessoa_juridica(root: str) -> str:
dataframe.to_csv(ou_filepath, index=False)

return ou_filepath


@task
def extract_last_date(
dataset_id: str,
table_id: str,
billing_project_id: str,
var_name: str,
) -> str:
"""
Extracts the last update date of a given dataset table.
Args:
dataset_id (str): The ID of the dataset.
table_id (str): The ID of the table.
billing_project_id (str): The billing project ID.
Returns:
str: The last update date in the format 'yyyy-mm-dd'.
Raises:
Exception: If an error occurs while extracting the last update date.
"""

query_bd = f"""
SELECT MAX({var_name}) as max_date
FROM
`{billing_project_id}.{dataset_id}.{table_id}`
"""

t = bd.read_sql(
query=query_bd,
billing_project_id=billing_project_id,
from_file=True,
)

data = t["max_date"][0]

log(f"A data mais recente da tabela é: {data}")

return str(data)
2 changes: 2 additions & 0 deletions pipelines/datasets/br_cvm_fi/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,3 +558,5 @@ class constants(Enum): # pylint: disable=c0103
URL_BALANCETE = "https://dados.cvm.gov.br/dados/FI/DOC/BALANCETE/DADOS/"

ARQUITETURA_URL_BALANCETE = "https://docs.google.com/spreadsheets/d/1eIMo_hYHy89oh6kHRN9Kh0NytUZzr8__/edit#gid=1045172528"

ARQUITETURA_URL_CDA = "https://docs.google.com/spreadsheets/d/1V2XHBXBB_biC0cLoMZ3FxtbC7CPLxQXZhIY7iJDtsSw/edit#gid=0"
Loading

0 comments on commit 4276314

Please sign in to comment.