Skip to content

Commit

Permalink
insere schedules
Browse files Browse the repository at this point in the history
  • Loading branch information
folhesgabriel committed Sep 20, 2023
1 parent 78661db commit b584619
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@
is_free=True,
time_delta=6,
time_unit="months",
upstream_tasks=[materialization_flow, data],
upstream_tasks=[wait_for_materialization, data],
)
br_cvm_adm_car_pes_jur.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_cvm_adm_car_pes_jur.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"materialization_mode": "prod",
"materialize_after_dump": True,
"table_id": "responsavel",
"dbt_alias": True,
},
)
],
Expand All @@ -41,7 +42,8 @@
"materialization_mode": "prod",
"materialize_after_dump": True,
"table_id": "pessoa_fisica",
"dbt_alias": False,
"udpate_metadata": True,
"dbt_alias": True,
},
)
],
Expand All @@ -61,7 +63,8 @@
"materialization_mode": "prod",
"materialize_after_dump": True,
"table_id": "pessoa_juridica",
"dbt_alias": False,
"udpate_metadata": True,
"dbt_alias": True,
},
)
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@
)

with case(update_metadata, True):
data = extract_last_date(dataset_id, table_id, "basedosdados")
data = extract_last_date(
dataset_id, table_id, "basedosdados", var_name="data_abertura_processo"
)
update_django_metadata(
dataset_id,
table_id,
Expand All @@ -105,7 +107,7 @@
is_free=True,
time_delta=6,
time_unit="months",
upstream_tasks=[materialization_flow, data],
upstream_tasks=[wait_for_materialization, data],
)

br_cvm_ofe_pub_dis_dia.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
"materialization_mode": "prod",
"materialize after dump": True,
"table_id": "dia",
"dbt_alias": False,
"udpate_metadata": True,
"dbt_alias": True,
},
),
],
Expand Down
14 changes: 8 additions & 6 deletions pipelines/datasets/br_cvm_oferta_publica_distribuicao/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ def clean_table_oferta_distribuicao(root: str) -> str:


@task
def extract_last_date(dataset_id: str, table_id: str, billing_project_id: str) -> str:
def extract_last_date(
dataset_id: str,
table_id: str,
billing_project_id: str,
var_name: str,
) -> datetime:
"""
Extracts the last update date of a given dataset table.
Expand All @@ -87,22 +92,19 @@ def extract_last_date(dataset_id: str, table_id: str, billing_project_id: str) -
Raises:
Exception: If an error occurs while extracting the last update date.
"""
log("dasdasdsadas")

query_bd = f"""
SELECT MAX(data_abertura_processo) as max_date
SELECT MAX({var_name}) as max_date
FROM
`{billing_project_id}.{dataset_id}.{table_id}`
"""
log(f"{query_bd}")
log("dasdasdsadas")

t = bd.read_sql(
query=query_bd,
billing_project_id=billing_project_id,
from_file=True,
)
log(f"{t}")

data = t["max_date"][0]

log(f"A data mais recente da tabela é: {data}")
Expand Down

0 comments on commit b584619

Please sign in to comment.