Skip to content

Commit

Permalink
Merge pull request #467 from basedosdados/staging/br_rf_cafir
Browse files Browse the repository at this point in the history
[fix] br_rf_cafir
  • Loading branch information
folhesgabriel authored Sep 18, 2023
2 parents 3926881 + 10a8f45 commit b1e0ba4
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
10 changes: 9 additions & 1 deletion pipelines/datasets/br_rf_cafir/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
parse_files_parse_date,
parse_data,
check_if_bq_data_is_outdated,
convert_datetime_to_string,
)

from pipelines.utils.constants import constants as utils_constants
Expand Down Expand Up @@ -56,6 +57,9 @@
is_outdated = check_if_bq_data_is_outdated(
dataset_id=dataset_id, table_id=table_id, data=info[0], upstream_tasks=[info]
)
update_metadata_strig_date = convert_datetime_to_string(
data=info[0], upstream_tasks=[info, is_outdated]
)

with case(is_outdated, False):
log_task(f"Não há atualizações para a tabela de {table_id}!")
Expand Down Expand Up @@ -106,13 +110,17 @@
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)
# TODO: Quando a nova fotografia for liberada setar is_free como True
# is_free como true. Não setei agora pq a task update_django_metadata depende
# de um coverage já criado na API. Como a lag entre fotográfias é de 5 meses (6 é o padrão no monento)
# não há necessidade de atualizar o coverage agora.

with case(update_metadata, True):
update = update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
_last_date=info[0],
_last_date=update_metadata_strig_date,
bq_last_update=False,
api_mode="prod",
date_format="yy-mm-dd",
Expand Down
16 changes: 14 additions & 2 deletions pipelines/datasets/br_rf_cafir/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ def parse_data(url: str, other_task_output: tuple[list[datetime], list[str]]) ->
# save new file as csv
df.to_csv(save_path, index=False, sep=",", na_rep="", encoding="utf-8")

# resolve ASCII 0 no momento da leitura do BQ
# ler e salvar de novo
# resolve ASCII 0 no momento da leitura do BQ. Ler e salvar de novo.
df = pd.read_csv(save_path, dtype=str)
df.to_csv(save_path, index=False, sep=",", na_rep="", encoding="utf-8")

Expand All @@ -179,3 +178,16 @@ def parse_data(url: str, other_task_output: tuple[list[datetime], list[str]]) ->
)

return files_path


@task
def convert_datetime_to_string(data: datetime):
"""Converte a data para string
Args:
date (datetime): Data
Returns:
string: Data no formato string
"""
return str(data)

0 comments on commit b1e0ba4

Please sign in to comment.