Skip to content

Commit

Permalink
Add task to check for data updates and use it to
Browse files Browse the repository at this point in the history
update metadata
  • Loading branch information
tricktx committed Nov 6, 2023
1 parent 46dfea3 commit 662547f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
21 changes: 9 additions & 12 deletions pipelines/datasets/br_anatel_telefonia_movel/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
clean_csv_municipio,
clean_csv_uf,
data_url,
task_check_for_data
)
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
Expand Down Expand Up @@ -131,8 +132,7 @@
)

with case(update_metadata, True):
date = data_url()
date_string = str(date) # task que retorna a data atual
date = task_check_for_data()
update_django_metadata(
dataset_id,
table_id[0],
Expand All @@ -145,7 +145,7 @@
time_unit="months",
api_mode="prod",
date_format="yy-mm",
_last_date=date_string,
_last_date=date,
upstream_tasks=[wait_for_materialization],
)

Expand Down Expand Up @@ -189,8 +189,7 @@
)

with case(update_metadata, True):
date = data_url()
date_string = str(date) # task que retorna a data atual
date = task_check_for_data()
update_django_metadata(
dataset_id,
table_id[1],
Expand All @@ -203,7 +202,7 @@
time_unit="months",
api_mode="prod",
date_format="yy-mm",
_last_date=date_string,
_last_date=date,
upstream_tasks=[wait_for_materialization],
)

Expand Down Expand Up @@ -249,8 +248,7 @@
)

with case(update_metadata, True):
date = data_url()
date_string = str(date) # task que retorna a data atual
date = task_check_for_data()
update_django_metadata(
dataset_id,
table_id[2],
Expand All @@ -263,7 +261,7 @@
time_unit="months",
api_mode="prod",
date_format="yy-mm",
_last_date=date_string,
_last_date=date,
upstream_tasks=[wait_for_materialization],
)

Expand Down Expand Up @@ -308,8 +306,7 @@
)

with case(update_metadata, True):
date = data_url()
date_string = str(date) # task que retorna a data atual
date = task_check_for_data()
update_django_metadata(
dataset_id,
table_id[3],
Expand All @@ -322,7 +319,7 @@
time_unit="months",
api_mode="prod",
date_format="yy-mm",
_last_date=date_string,
_last_date=date,
upstream_tasks=[wait_for_materialization],
)

Expand Down
6 changes: 5 additions & 1 deletion pipelines/datasets/br_anatel_telefonia_movel/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""

import os
from datetime import timedelta
from datetime import datetime, timedelta

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -53,6 +53,7 @@ def data_url(anos, mes_um, mes_dois):
@task
def check_for_updates(dataset_id, table_id):
data_obj = data_url("2023", "07", "12")
data_obj = datetime.strptime(data_obj, "%Y-%m-%d").date()
# Obtém a última data no site BD
data_bq_obj = extract_last_date(
dataset_id, table_id, "yy-mm-dd", "basedosdados", data="data_coleta"
Expand All @@ -68,6 +69,9 @@ def check_for_updates(dataset_id, table_id):
else:
return False # Não há novas atualizações disponíveis

@task
def task_check_for_data():
return data_url("2023", "07", "12")

# ! TASK MICRODADOS
@task(
Expand Down

0 comments on commit 662547f

Please sign in to comment.