diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index f8020e513..762c3dbf3 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -52,16 +52,24 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: logger.info("The on_dag_run_success was called") dag = dag_run.get_dag() + logger.info(f"dir: {dir(dag)}") logger.info(f"1: {dag.fileloc}") logger.info(f"2:{dag.filepath}") logger.info(f"3: {dag.task_dict}") logger.info(f"4: {dag.task_group_dict}") - logger.info(f"5: {dag.serialize_to_json()}") - logger.info(f"6: {dag.deserialize_dag()}") + logger.info(f"5: {dag.get_serialized_fields()}") + logger.info(f"6: {dag.to_dict()}") + logger.info(f"7: {dag.deserialize()}") + + if not isinstance(dag, DAG): + from airflow.models import DagBag - if not uses_cosmos(dag): logger.info("The DAG does not use Cosmos") + dag_bag = DagBag(dag_folder=dag.fileloc, include_examples=False) + dag = dag_bag.get_dag(dag_run.dag_id) + + if not uses_cosmos(dag): return additional_telemetry_metrics = {