From 7a166c998e411f9bd41755e89c0fedd35a857082 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 16 Dec 2024 13:14:58 +0530 Subject: [PATCH] Address review feedback and write a separate DAG for AF 2.8 callback function --- cosmos/{helpers.py => io.py} | 22 ++++++++++++++++++++++ dev/dags/basic_cosmos_dag.py | 17 ----------------- dev/dags/example_operators.py | 15 ++++++++++++++- tests/test_example_dags.py | 2 +- 4 files changed, 37 insertions(+), 19 deletions(-) rename cosmos/{helpers.py => io.py} (85%) diff --git a/cosmos/helpers.py b/cosmos/io.py similarity index 85% rename from cosmos/helpers.py rename to cosmos/io.py index 66d32b6bb..7d2567371 100644 --- a/cosmos/helpers.py +++ b/cosmos/io.py @@ -12,6 +12,11 @@ def upload_artifacts_to_aws_s3(project_dir: str, **kwargs: Any) -> None: + """ + Helper function demonstrating how to upload artifacts to AWS S3 that can be used as a callback. + + :param project_dir: Path of the cloned project directory which Cosmos tasks work from. + """ from airflow.providers.amazon.aws.hooks.s3 import S3Hook target_dir = f"{project_dir}/target" @@ -38,6 +43,11 @@ def upload_artifacts_to_aws_s3(project_dir: str, **kwargs: Any) -> None: def upload_artifacts_to_gcp_gs(project_dir: str, **kwargs: Any) -> None: + """ + Helper function demonstrating how to upload artifacts to GCP GS that can be used as a callback. + + :param project_dir: Path of the cloned project directory which Cosmos tasks work from. + """ from airflow.providers.google.cloud.hooks.gcs import GCSHook target_dir = f"{project_dir}/target" @@ -63,6 +73,11 @@ def upload_artifacts_to_gcp_gs(project_dir: str, **kwargs: Any) -> None: def upload_artifacts_to_azure_wasb(project_dir: str, **kwargs: Any) -> None: + """ + Helper function demonstrating how to upload artifacts to Azure WASB that can be used as a callback. + + :param project_dir: Path of the cloned project directory which Cosmos tasks work from. + """ from airflow.providers.microsoft.azure.hooks.wasb import WasbHook target_dir = f"{project_dir}/target" @@ -146,6 +161,13 @@ def _construct_dest_file_path( def upload_artifacts_to_cloud_storage(project_dir: str, **kwargs: Any) -> None: + """ + Helper function demonstrating how to upload artifacts to remote blob stores that can be used as a callback. This is + an example of a helper function that can be used if on Airflow >= 2.8 and cosmos configurations like + ``remote_target_path`` and ``remote_target_path_conn_id`` when set can be leveraged. + + :param project_dir: Path of the cloned project directory which Cosmos tasks work from. + """ dest_target_dir, dest_conn_id = _configure_remote_target_path() if not dest_target_dir: diff --git a/dev/dags/basic_cosmos_dag.py b/dev/dags/basic_cosmos_dag.py index 6fb90d2fd..f71f351f0 100644 --- a/dev/dags/basic_cosmos_dag.py +++ b/dev/dags/basic_cosmos_dag.py @@ -7,7 +7,6 @@ from pathlib import Path from cosmos import DbtDag, ProfileConfig, ProjectConfig -from cosmos.helpers import upload_artifacts_to_cloud_storage from cosmos.profiles import PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" @@ -33,22 +32,6 @@ operator_args={ "install_deps": True, # install any necessary dependencies before running any dbt command "full_refresh": True, # used only in dbt commands that support this flag - # -------------------------------------------------------------- - # Callback function to upload artifacts using Airflow Object storage and Cosmos remote_target_path setting on Airflow 2.8 and above - "callback": upload_artifacts_to_cloud_storage, - # -------------------------------------------------------------- - # Callback function to upload artifacts to AWS S3 for Airflow < 2.8 - # "callback": upload_artifacts_to_aws_s3, - # "callback_args": {"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"}, - # -------------------------------------------------------------- - # Callback function to upload artifacts to GCP GS for Airflow < 2.8 - # "callback": upload_artifacts_to_gcp_gs, - # "callback_args": {"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"}, - # -------------------------------------------------------------- - # Callback function to upload artifacts to Azure WASB for Airflow < 2.8 - # "callback": upload_artifacts_to_azure_wasb, - # "callback_args": {"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"}, - # -------------------------------------------------------------- }, # normal dag parameters schedule_interval="@daily", diff --git a/dev/dags/example_operators.py b/dev/dags/example_operators.py index fa66543e2..be1dc1947 100644 --- a/dev/dags/example_operators.py +++ b/dev/dags/example_operators.py @@ -5,7 +5,7 @@ from airflow import DAG from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig -from cosmos.helpers import upload_artifacts_to_aws_s3 +from cosmos.io import upload_artifacts_to_aws_s3 DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) @@ -20,6 +20,7 @@ ) with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag: + # [START single_operator_callback] seed_operator = DbtSeedLocalOperator( profile_config=profile_config, project_dir=DBT_PROJ_DIR, @@ -27,9 +28,21 @@ dbt_cmd_flags=["--select", "raw_customers"], install_deps=True, append_env=True, + # Callback function to upload artifacts to AWS S3 callback=upload_artifacts_to_aws_s3, callback_args={"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"}, + # -------------------------------------------------------------- + # Callback function to upload artifacts to GCP GS + # "callback": upload_artifacts_to_gcp_gs, + # "callback_args": {"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"}, + # -------------------------------------------------------------- + # Callback function to upload artifacts to Azure WASB + # "callback": upload_artifacts_to_azure_wasb, + # "callback_args": {"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"}, + # -------------------------------------------------------------- ) + # [END single_operator_callback] + run_operator = DbtRunLocalOperator( profile_config=profile_config, project_dir=DBT_PROJ_DIR, diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 5d7a1a70d..762985b59 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -30,7 +30,7 @@ MIN_VER_DAG_FILE: dict[str, list[str]] = { "2.4": ["cosmos_seed_dag.py"], - "2.8": ["cosmos_manifest_example.py", "simple_dag_async.py"], + "2.8": ["cosmos_manifest_example.py", "simple_dag_async.py", "cosmos_callback_dag.py"], } IGNORED_DAG_FILES = ["performance_dag.py", "jaffle_shop_kubernetes.py"]