From 5caf6a869d0c8207e7520cf012b06497ee952fff Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 06:33:44 +0000 Subject: [PATCH] Force to fetch the actual dag, add more debug logs --- cosmos/listeners/dag_run_listener.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 762c3dbf3..b382b8210 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -51,18 +51,17 @@ def uses_cosmos(dag: DAG) -> bool: def on_dag_run_success(dag_run: DagRun, msg: str) -> None: logger.info("The on_dag_run_success was called") - dag = dag_run.get_dag() + # The following is an airflow.serialization.serialized_objects.SerializedDAG instance + serialized_dag = dag_run.get_dag() + serialized_dag - logger.info(f"dir: {dir(dag)}") - logger.info(f"1: {dag.fileloc}") - logger.info(f"2:{dag.filepath}") - logger.info(f"3: {dag.task_dict}") - logger.info(f"4: {dag.task_group_dict}") - logger.info(f"5: {dag.get_serialized_fields()}") - logger.info(f"6: {dag.to_dict()}") - logger.info(f"7: {dag.deserialize()}") + logger.info(f"dir: {dir(serialized_dag)}") + logger.info(f"1: {serialized_dag.fileloc}") + logger.info(f"2:{serialized_dag.filepath}") + logger.info(f"3: {serialized_dag.task_dict}") + logger.info(f"4: {serialized_dag.task_group_dict}") - if not isinstance(dag, DAG): + if not isinstance(serialized_dag, DAG): from airflow.models import DagBag logger.info("The DAG does not use Cosmos") @@ -70,8 +69,11 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: dag = dag_bag.get_dag(dag_run.dag_id) if not uses_cosmos(dag): + logger.info(f"5: {serialized_dag.deserialize()}") return + logger.info(f"5: {serialized_dag.deserialize()}") + additional_telemetry_metrics = { "dag_hash": dag_run.dag_hash, "status": EventStatus.SUCCESS,