Skip to content

Commit

Permalink
insere update_metadta em br_cvm_administradores_carteira
Browse files Browse the repository at this point in the history
  • Loading branch information
folhesgabriel committed Sep 20, 2023
1 parent 7347135 commit 78661db
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 53 deletions.
98 changes: 45 additions & 53 deletions pipelines/datasets/br_cvm_administradores_carteira/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
clean_table_responsavel,
clean_table_pessoa_fisica,
clean_table_pessoa_juridica,
extract_last_date,
)
from pipelines.utils.metadata.tasks import update_django_metadata

from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
from pipelines.utils.tasks import (
create_table_and_upload_to_gcs,
update_metadata,
get_temporal_coverage,
rename_current_flow_run_dataset_table,
get_current_flow_labels,
Expand All @@ -37,7 +39,7 @@
URL = "http://dados.cvm.gov.br/dados/ADM_CART/CAD/DADOS/cad_adm_cart.zip"

with Flow(
name="br_cvm_administradores_carteira.responsavel", code_owners=["lucas_cr"]
name="br_cvm_administradores_carteira.responsavel", code_owners=["Equipe Pipelines"]
) as br_cvm_adm_car_res:
# Parameters
dataset_id = Parameter(
Expand Down Expand Up @@ -68,17 +70,7 @@
dump_mode="append",
wait=filepath,
)

# no generate temporal coverage since there is no date variable
# wait_update_metadata = update_metadata(
# dataset_id=dataset_id,
# table_id=table_id,
# fields_to_update=[
# {"last_updated": {"data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}}
# ],
# upstream_tasks=[wait_upload_table],
# )

# dont generate temporal coverage since there's no date variable
with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
Expand Down Expand Up @@ -113,7 +105,7 @@
br_cvm_adm_car_res.schedule = schedule_responsavel

with Flow(
"br_cvm_administradores_carteira.pessoa_fisica", code_owners=["lucas_cr"]
"br_cvm_administradores_carteira.pessoa_fisica", code_owners=["Equipe Pipelines"]
) as br_cvm_adm_car_pes_fis:
# Parameters
dataset_id = Parameter(
Expand All @@ -129,6 +121,7 @@
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=False, required=False)
update_metadata = Parameter("update_metadata", default=False, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
Expand All @@ -145,25 +138,6 @@
wait=filepath,
)

# update_metadata
temporal_coverage = get_temporal_coverage(
filepath=filepath,
date_cols=["data_registro"],
time_unit="day",
interval="1",
upstream_tasks=[wait_upload_table],
)

# wait_update_metadata = update_metadata(
# dataset_id=dataset_id,
# table_id=table_id,
# fields_to_update=[
# {"last_updated": {"data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}},
# {"temporal_coverage": [temporal_coverage]},
# ],
# upstream_tasks=[temporal_coverage],
# )

with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
Expand Down Expand Up @@ -192,13 +166,31 @@
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)
with case(update_metadata, True):
data = extract_last_date(
dataset_id, table_id, "basedosdados", var_name="data_registro"
)
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
_last_date=data,
bq_last_update=False,
api_mode="prod",
date_format="yy-mm-dd",
is_bd_pro=True,
is_free=True,
time_delta=6,
time_unit="months",
upstream_tasks=[materialization_flow, data],
)

br_cvm_adm_car_pes_fis.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_cvm_adm_car_pes_fis.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_cvm_adm_car_pes_fis.schedule = schedule_fisica

with Flow(
"br_cvm_administradores_carteira.pessoa_juridica", code_owners=["lucas_cr"]
"br_cvm_administradores_carteira.pessoa_juridica", code_owners=["Equipe Pipelines"]
) as br_cvm_adm_car_pes_jur:
# Parameters
dataset_id = Parameter(
Expand All @@ -214,6 +206,7 @@
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=False, required=False)
update_metadata = Parameter("update_metadata", default=False, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
Expand All @@ -230,25 +223,6 @@
wait=filepath,
)

# update_metadata
temporal_coverage = get_temporal_coverage(
filepath=filepath,
date_cols=["data_registro"],
time_unit="day",
interval="1",
upstream_tasks=[wait_upload_table],
)

# wait_update_metadata = update_metadata(
# dataset_id=dataset_id,
# table_id=table_id,
# fields_to_update=[
# {"last_updated": {"data": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}},
# {"temporal_coverage": [temporal_coverage]},
# ],
# upstream_tasks=[temporal_coverage],
# )

with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
Expand Down Expand Up @@ -278,6 +252,24 @@
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
data = extract_last_date(
dataset_id, table_id, "basedosdados", var_name="data_registro"
)
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
_last_date=data,
bq_last_update=False,
api_mode="prod",
date_format="yy-mm-dd",
is_bd_pro=True,
is_free=True,
time_delta=6,
time_unit="months",
upstream_tasks=[materialization_flow, data],
)
br_cvm_adm_car_pes_jur.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_cvm_adm_car_pes_jur.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_cvm_adm_car_pes_jur.schedule = schedule_juridica
44 changes: 44 additions & 0 deletions pipelines/datasets/br_cvm_administradores_carteira/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from pandas.api.types import is_string_dtype
from prefect import task
from unidecode import unidecode
import basedosdados as bd
from datetime import datetime
from pipelines.utils.utils import log


@task
Expand Down Expand Up @@ -150,3 +153,44 @@ def clean_table_pessoa_juridica(root: str) -> str:
dataframe.to_csv(ou_filepath, index=False)

return ou_filepath


@task
def extract_last_date(
dataset_id: str,
table_id: str,
billing_project_id: str,
var_name: str,
) -> datetime:
"""
Extracts the last update date of a given dataset table.
Args:
dataset_id (str): The ID of the dataset.
table_id (str): The ID of the table.
billing_project_id (str): The billing project ID.
Returns:
str: The last update date in the format 'yyyy-mm-dd'.
Raises:
Exception: If an error occurs while extracting the last update date.
"""

query_bd = f"""
SELECT MAX({var_name}) as max_date
FROM
`{billing_project_id}.{dataset_id}.{table_id}`
"""

t = bd.read_sql(
query=query_bd,
billing_project_id=billing_project_id,
from_file=True,
)

data = t["max_date"][0]

log(f"A data mais recente da tabela é: {data}")

return str(data)

0 comments on commit 78661db

Please sign in to comment.