From 3eb86ea26fbd7988b631747ffd16586effb11a73 Mon Sep 17 00:00:00 2001 From: tricktx Date: Tue, 12 Nov 2024 19:32:59 -0300 Subject: [PATCH] register flow part 3 --- pipelines/datasets/br_anatel_banda_larga_fixa/flows.py | 1 - pipelines/datasets/br_anatel_telefonia_movel/flows.py | 1 - pipelines/utils/crawler_anatel/banda_larga_fixa/flows.py | 1 - pipelines/utils/crawler_anatel/banda_larga_fixa/utils.py | 3 ++- pipelines/utils/crawler_anatel/telefonia_movel/flows.py | 8 +++----- 5 files changed, 5 insertions(+), 9 deletions(-) diff --git a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py index 4f14adcfa..0b6966dd6 100644 --- a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py +++ b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py @@ -15,7 +15,6 @@ ) from pipelines.utils.crawler_anatel.banda_larga_fixa.flows import flow_anatel_banda_larga_fixa - # ? Microdados br_anatel_banda_larga_fixa__microdados = deepcopy(flow_anatel_banda_larga_fixa) br_anatel_banda_larga_fixa__microdados.name = "br_anatel_banda_larga_fixa.microdados" diff --git a/pipelines/datasets/br_anatel_telefonia_movel/flows.py b/pipelines/datasets/br_anatel_telefonia_movel/flows.py index 3a24008ce..15f01af57 100644 --- a/pipelines/datasets/br_anatel_telefonia_movel/flows.py +++ b/pipelines/datasets/br_anatel_telefonia_movel/flows.py @@ -15,7 +15,6 @@ schedule_br_anatel_telefonia_movel__uf) from pipelines.utils.crawler_anatel.telefonia_movel.flows import flow_anatel_telefonia_movel - # ? -------------------------------> Microdados br_anatel_telefonia_movel__microdados = deepcopy(flow_anatel_telefonia_movel) br_anatel_telefonia_movel__microdados.name = "br_anatel_telefonia_movel.microdados" diff --git a/pipelines/utils/crawler_anatel/banda_larga_fixa/flows.py b/pipelines/utils/crawler_anatel/banda_larga_fixa/flows.py index b4c5a9391..7e01ce966 100644 --- a/pipelines/utils/crawler_anatel/banda_larga_fixa/flows.py +++ b/pipelines/utils/crawler_anatel/banda_larga_fixa/flows.py @@ -70,7 +70,6 @@ wait=filepath, upstream_tasks=[filepath], # Fix: Wrap filepath in a list to make it iterable ) - with case(materialize_after_dump, True): # Trigger DBT flow run current_flow_labels = get_current_flow_labels() diff --git a/pipelines/utils/crawler_anatel/banda_larga_fixa/utils.py b/pipelines/utils/crawler_anatel/banda_larga_fixa/utils.py index 6b40e2559..f8912426c 100644 --- a/pipelines/utils/crawler_anatel/banda_larga_fixa/utils.py +++ b/pipelines/utils/crawler_anatel/banda_larga_fixa/utils.py @@ -91,6 +91,7 @@ def check_and_create_column(df: pd.DataFrame, col_name: str) -> pd.DataFrame: # ! Pandas DataFrame: O DataFrame modificado. """ + if col_name not in df.columns: df[col_name] = "" return df @@ -225,4 +226,4 @@ def get_year(): max_year = max(lista) log(f"Ano máximo: {max_year}") - return max_year + return max_year \ No newline at end of file diff --git a/pipelines/utils/crawler_anatel/telefonia_movel/flows.py b/pipelines/utils/crawler_anatel/telefonia_movel/flows.py index 80c466bb8..e8a7d0ffb 100644 --- a/pipelines/utils/crawler_anatel/telefonia_movel/flows.py +++ b/pipelines/utils/crawler_anatel/telefonia_movel/flows.py @@ -43,11 +43,9 @@ "materialize_after_dump", default=True, required=False ) dbt_alias = Parameter("dbt_alias", default=True, required=False) - year = get_year_and_unzip() - semestre = get_semester() - ano = Parameter("ano", default=year, required=False) - semestre = Parameter("semestre", default=semestre, required=False) + ano = Parameter("ano", default=get_year_and_unzip(), required=False) + semestre = Parameter("semestre", default=get_semester(), required=False) update_metadata = Parameter("update_metadata", default=True, required=False) @@ -130,4 +128,4 @@ ) flow_anatel_telefonia_movel.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -flow_anatel_telefonia_movel.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) \ No newline at end of file +flow_anatel_telefonia_movel.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)