From 7d4288c8aa2a81d1126de4d10173501d542fb0f3 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 11:46:38 -0300 Subject: [PATCH 1/6] testa update_metadata novamente --- pipelines/datasets/br_rf_cafir/flows.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index 34f9a5a42..d97924e80 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -57,6 +57,22 @@ dataset_id=dataset_id, table_id=table_id, data=info[0], upstream_tasks=[info] ) + with case(update_metadata, True): + update = update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + _last_date=info[0], + bq_last_update=False, + api_mode="prod", + date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[info], + ) + with case(is_outdated, False): log_task(f"Não há atualizações para a tabela de {table_id}!") From 8a1699ea02d0ece0658bc2d7e004e2ea213cab41 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 12:26:27 -0300 Subject: [PATCH 2/6] registar o flow --- pipelines/datasets/br_rf_cafir/flows.py | 2 +- pipelines/datasets/br_rf_cafir/tasks.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index d97924e80..c0d28369d 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -70,7 +70,7 @@ is_free=True, time_delta=6, time_unit="months", - upstream_tasks=[info], + upstream_tasks=[is_outdated, info], ) with case(is_outdated, False): diff --git a/pipelines/datasets/br_rf_cafir/tasks.py b/pipelines/datasets/br_rf_cafir/tasks.py index 8358c5918..5beaa5d8f 100644 --- a/pipelines/datasets/br_rf_cafir/tasks.py +++ b/pipelines/datasets/br_rf_cafir/tasks.py @@ -155,8 +155,7 @@ def parse_data(url: str, other_task_output: tuple[list[datetime], list[str]]) -> # save new file as csv df.to_csv(save_path, index=False, sep=",", na_rep="", encoding="utf-8") - # resolve ASCII 0 no momento da leitura do BQ - # ler e salvar de novo + # resolve ASCII 0 no momento da leitura do BQ. Ler e salvar de novo. df = pd.read_csv(save_path, dtype=str) df.to_csv(save_path, index=False, sep=",", na_rep="", encoding="utf-8") From 7bb843c93be06fc0e5cec9dd3348807fc7d05a33 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 12:59:11 -0300 Subject: [PATCH 3/6] cria task que converte datetime pra str --- pipelines/datasets/br_rf_cafir/flows.py | 4 +++- pipelines/datasets/br_rf_cafir/tasks.py | 13 +++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index c0d28369d..73262f2ec 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -16,6 +16,7 @@ parse_files_parse_date, parse_data, check_if_bq_data_is_outdated, + convert_datetime_to_string, ) from pipelines.utils.constants import constants as utils_constants @@ -56,13 +57,14 @@ is_outdated = check_if_bq_data_is_outdated( dataset_id=dataset_id, table_id=table_id, data=info[0], upstream_tasks=[info] ) + update_metadata_strig_date = convert_datetime_to_string(data=info[0]) with case(update_metadata, True): update = update_django_metadata( dataset_id, table_id, metadata_type="DateTimeRange", - _last_date=info[0], + _last_date=update_metadata_strig_date, bq_last_update=False, api_mode="prod", date_format="yy-mm-dd", diff --git a/pipelines/datasets/br_rf_cafir/tasks.py b/pipelines/datasets/br_rf_cafir/tasks.py index 5beaa5d8f..8c4b9b5b9 100644 --- a/pipelines/datasets/br_rf_cafir/tasks.py +++ b/pipelines/datasets/br_rf_cafir/tasks.py @@ -178,3 +178,16 @@ def parse_data(url: str, other_task_output: tuple[list[datetime], list[str]]) -> ) return files_path + + +@task +def convert_datetime_to_string(data: datetime): + """Converte a data para string + + Args: + date (datetime): Data + + Returns: + string: Data no formato string + """ + return str(data) From 2a189bdf4f55b68842ae694c6ac42b2316d484fe Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 13:34:38 -0300 Subject: [PATCH 4/6] reordena upstream tasks --- pipelines/datasets/br_rf_cafir/flows.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index 73262f2ec..8303a07f4 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -57,7 +57,9 @@ is_outdated = check_if_bq_data_is_outdated( dataset_id=dataset_id, table_id=table_id, data=info[0], upstream_tasks=[info] ) - update_metadata_strig_date = convert_datetime_to_string(data=info[0]) + update_metadata_strig_date = convert_datetime_to_string( + data=info[0], upstream_tasks=[info, is_outdated] + ) with case(update_metadata, True): update = update_django_metadata( @@ -72,7 +74,7 @@ is_free=True, time_delta=6, time_unit="months", - upstream_tasks=[is_outdated, info], + upstream_tasks=[update_metadata_strig_date], ) with case(is_outdated, False): From a976d640c3a847f89e76113cf083248a920bfdcb Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 14:39:49 -0300 Subject: [PATCH 5/6] =?UTF-8?q?tira=20updater=20do=20coverage=20gr=C3=A1ti?= =?UTF-8?q?s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/datasets/br_rf_cafir/flows.py | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index 8303a07f4..fbbbadda0 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -61,22 +61,6 @@ data=info[0], upstream_tasks=[info, is_outdated] ) - with case(update_metadata, True): - update = update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - _last_date=update_metadata_strig_date, - bq_last_update=False, - api_mode="prod", - date_format="yy-mm-dd", - is_bd_pro=True, - is_free=True, - time_delta=6, - time_unit="months", - upstream_tasks=[update_metadata_strig_date], - ) - with case(is_outdated, False): log_task(f"Não há atualizações para a tabela de {table_id}!") @@ -126,6 +110,10 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + # TODO: Quando a nova fotografia for liberada setar is_free como True + # is_free como true. Não setei agora pq a task update_django_metadata depende + # de um coverage já criado na API. Como a lag entre fotográfias é de 5 meses (6 é o padrão no monento) + # não há necessidade de atualizar o coverage agora. with case(update_metadata, True): update = update_django_metadata( From 8a2a7d4f8a6237784601e772b36244272d69ac3c Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Fri, 15 Sep 2023 14:48:43 -0300 Subject: [PATCH 6/6] =?UTF-8?q?insere=20novo=20input=20na=20fun=C3=A7?= =?UTF-8?q?=C3=A3o=20update=5Fdjango=5Fmetadata?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipelines/datasets/br_rf_cafir/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index fbbbadda0..63b9fdb60 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -120,7 +120,7 @@ dataset_id, table_id, metadata_type="DateTimeRange", - _last_date=info[0], + _last_date=update_metadata_strig_date, bq_last_update=False, api_mode="prod", date_format="yy-mm-dd",