diff --git a/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py b/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py index b79d0f5b3..345034402 100644 --- a/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py +++ b/pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py @@ -27,6 +27,7 @@ download_files, merge_and_clean_data, make_partitions, + table_is_available, ) from pipelines.datasets.br_cgu_servidores_executivo_federal.constants import ( constants as cgu_constants, @@ -78,62 +79,167 @@ ) log_task("Partitions done") - # create_table_and_upload_to_gcs( - # data_path=outputs_path_by_table["aposentados_cadastro"], - # dataset_id=dataset_id, - # table_id="aposentados_cadastro", - # dump_mode="append", - # wait=outputs_path_by_table, - # ) - - # create_table_and_upload_to_gcs( - # data_path=outputs_path_by_table["pensionistas_cadastro"], - # dataset_id=dataset_id, - # table_id="pensionistas_cadastro", - # dump_mode="append", - # wait=outputs_path_by_table, - # ) - - create_table_and_upload_to_gcs( - data_path=outputs_path_by_table["servidores_cadastro"], - dataset_id=dataset_id, - table_id="servidores_cadastro", - dump_mode="append", - wait=outputs_path_by_table, - ) + with case(table_is_available(outputs_path_by_table, "aposentados_cadastro"), True): + create_table_and_upload_to_gcs( + data_path=outputs_path_by_table["aposentados_cadastro"], + dataset_id=dataset_id, + table_id="aposentados_cadastro", + dump_mode="append", + wait=outputs_path_by_table, + ) + + with case(table_is_available(outputs_path_by_table, "pensionistas_cadastro"), True): + create_table_and_upload_to_gcs( + data_path=outputs_path_by_table["pensionistas_cadastro"], + dataset_id=dataset_id, + table_id="pensionistas_cadastro", + dump_mode="append", + wait=outputs_path_by_table, + ) + + with case(table_is_available(outputs_path_by_table, "servidores_cadastro"), True): + create_table_and_upload_to_gcs( + data_path=outputs_path_by_table["servidores_cadastro"], + dataset_id=dataset_id, + table_id="servidores_cadastro", + dump_mode="append", + wait=outputs_path_by_table, + ) + + with case( + table_is_available(outputs_path_by_table, "reserva_reforma_militares_cadastro"), + True, + ): + create_table_and_upload_to_gcs( + data_path=outputs_path_by_table["reserva_reforma_militares_cadastro"], + dataset_id=dataset_id, + table_id="reserva_reforma_militares_cadastro", + dump_mode="append", + wait=outputs_path_by_table, + ) + + with case(table_is_available(outputs_path_by_table, "remuneracao"), True): + create_table_and_upload_to_gcs( + data_path=outputs_path_by_table["remuneracao"], + dataset_id=dataset_id, + table_id="remuneracao", + dump_mode="append", + wait=outputs_path_by_table, + ) + + with case(table_is_available(outputs_path_by_table, "afastamentos"), True): + create_table_and_upload_to_gcs( + data_path=outputs_path_by_table["afastamentos"], + dataset_id=dataset_id, + table_id="afastamentos", + dump_mode="append", + wait=outputs_path_by_table, + ) + + with case(table_is_available(outputs_path_by_table, "observacoes"), True): + create_table_and_upload_to_gcs( + data_path=outputs_path_by_table["observacoes"], + dataset_id=dataset_id, + table_id="observacoes", + dump_mode="append", + wait=outputs_path_by_table, + ) + + # aposentados_cadastro + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": "aposentados_cadastro", + "mode": materialization_mode, + "dbt_alias": dbt_alias, + }, + labels=current_flow_labels, + run_name=r"Materialize {dataset_id}.aposentados_cadastro", + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = datetime.timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(update_metadata, True): + update_django_metadata( + dataset_id, + table_id="aposentados_cadastro", + metadata_type="DateTimeRange", + bq_last_update=False, + bq_table_last_year_month=True, + billing_project_id="basedosdados", + api_mode="dev", + date_format="yy-mm", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[wait_for_materialization], + ) + + # pensionistas_cadastro + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": "pensionistas_cadastro", + "mode": materialization_mode, + "dbt_alias": dbt_alias, + }, + labels=current_flow_labels, + run_name=r"Materialize {dataset_id}.pensionistas_cadastro", + ) - # create_table_and_upload_to_gcs( - # data_path=outputs_path_by_table["reserva_reforma_militares_cadastro"], - # dataset_id=dataset_id, - # table_id="reserva_reforma_militares_cadastro", - # dump_mode="append", - # wait=outputs_path_by_table, - # ) - - # create_table_and_upload_to_gcs( - # data_path=outputs_path_by_table["remuneracao"], - # dataset_id=dataset_id, - # table_id="remuneracao", - # dump_mode="append", - # wait=outputs_path_by_table, - # ) - - # create_table_and_upload_to_gcs( - # data_path=outputs_path_by_table["afastamentos"], - # dataset_id=dataset_id, - # table_id="afastamentos", - # dump_mode="append", - # wait=outputs_path_by_table, - # ) - - # create_table_and_upload_to_gcs( - # data_path=outputs_path_by_table["observacoes"], - # dataset_id=dataset_id, - # table_id="observacoes", - # dump_mode="append", - # wait=outputs_path_by_table, - # ) + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = datetime.timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(update_metadata, True): + update_django_metadata( + dataset_id, + table_id="pensionistas_cadastro", + metadata_type="DateTimeRange", + bq_last_update=False, + bq_table_last_year_month=True, + billing_project_id="basedosdados", + api_mode="dev", + date_format="yy-mm", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[wait_for_materialization], + ) + # servidores_cadastro with case(materialize_after_dump, True): # Trigger DBT flow run current_flow_labels = get_current_flow_labels() @@ -147,7 +253,195 @@ "dbt_alias": dbt_alias, }, labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.servidores_cadastro", + run_name=r"Materialize {dataset_id}.servidores_cadastro", + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = datetime.timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(update_metadata, True): + update_django_metadata( + dataset_id, + table_id="servidores_cadastro", + metadata_type="DateTimeRange", + bq_last_update=False, + bq_table_last_year_month=True, + billing_project_id="basedosdados", + api_mode="dev", + date_format="yy-mm", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[wait_for_materialization], + ) + + # reserva_reforma_militares_cadastro + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": "reserva_reforma_militares_cadastro", + "mode": materialization_mode, + "dbt_alias": dbt_alias, + }, + labels=current_flow_labels, + run_name=r"Materialize {dataset_id}.reserva_reforma_militares_cadastro", + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = datetime.timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(update_metadata, True): + update_django_metadata( + dataset_id, + table_id="reserva_reforma_militares_cadastro", + metadata_type="DateTimeRange", + bq_last_update=False, + bq_table_last_year_month=True, + billing_project_id="basedosdados", + api_mode="dev", + date_format="yy-mm", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[wait_for_materialization], + ) + + # remuneracao + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": "remuneracao", + "mode": materialization_mode, + "dbt_alias": dbt_alias, + }, + labels=current_flow_labels, + run_name=r"Materialize {dataset_id}.remuneracao", + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = datetime.timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(update_metadata, True): + update_django_metadata( + dataset_id, + table_id="remuneracao", + metadata_type="DateTimeRange", + bq_last_update=False, + bq_table_last_year_month=True, + billing_project_id="basedosdados", + api_mode="dev", + date_format="yy-mm", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[wait_for_materialization], + ) + + # afastamentos + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": "afastamentos", + "mode": materialization_mode, + "dbt_alias": dbt_alias, + }, + labels=current_flow_labels, + run_name=r"Materialize {dataset_id}.afastamentos", + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = datetime.timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(update_metadata, True): + update_django_metadata( + dataset_id, + table_id="afastamentos", + metadata_type="DateTimeRange", + bq_last_update=False, + bq_table_last_year_month=True, + billing_project_id="basedosdados", + api_mode="dev", + date_format="yy-mm", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[wait_for_materialization], + ) + + # observacoes + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": "observacoes", + "mode": materialization_mode, + "dbt_alias": dbt_alias, + }, + labels=current_flow_labels, + run_name=r"Materialize {dataset_id}.observacoes", ) wait_for_materialization = wait_for_flow_run( @@ -166,7 +460,7 @@ with case(update_metadata, True): update_django_metadata( dataset_id, - table_id, + table_id="observacoes", metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, diff --git a/pipelines/datasets/br_cgu_servidores_executivo_federal/tasks.py b/pipelines/datasets/br_cgu_servidores_executivo_federal/tasks.py index 1e5b44a9f..e5ae352c7 100644 --- a/pipelines/datasets/br_cgu_servidores_executivo_federal/tasks.py +++ b/pipelines/datasets/br_cgu_servidores_executivo_federal/tasks.py @@ -115,3 +115,12 @@ def make_partitions(tables: list[tuple[str, pd.DataFrame]]) -> dict[str, str]: return { table_name: f"{output}/{table_name}" for table_name, df in tables if len(df) > 0 } + + +@task +def table_is_available(tables: dict[str, str], table: str) -> bool: + available = table in tables + + log(f"{table=} not available in {tables.keys()=}") + + return available