diff --git a/dev/dags/example_virtualenv.py b/dev/dags/example_virtualenv.py index 475e4e66c..a46473c3c 100644 --- a/dev/dags/example_virtualenv.py +++ b/dev/dags/example_virtualenv.py @@ -5,7 +5,11 @@ from datetime import datetime from pathlib import Path -from cosmos import DbtDag, ExecutionMode, ExecutionConfig, ProjectConfig, ProfileConfig +from airflow.decorators import dag +from airflow.configuration import get_airflow_home +from airflow.operators.empty import EmptyOperator + +from cosmos import DbtTaskGroup, ExecutionMode, ExecutionConfig, ProjectConfig, ProfileConfig from cosmos.profiles import PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" @@ -21,29 +25,57 @@ ), ) -# [START virtualenv_example] -example_virtualenv = DbtDag( - # dbt/cosmos-specific parameters - project_config=ProjectConfig( - DBT_ROOT_PATH / "jaffle_shop", - ), - profile_config=profile_config, - execution_config=ExecutionConfig( - execution_mode=ExecutionMode.VIRTUALENV, - # We can enable this flag if we want Airflow to create one virtualenv - # and reuse that within the whole DAG. - # virtualenv_dir=f"{get_airflow_home()}/persistent-venv", - ), - operator_args={ - "py_system_site_packages": False, - "py_requirements": ["dbt-postgres==1.6.0b1"], - "install_deps": True, - }, - # normal dag parameters +@dag( schedule_interval="@daily", start_date=datetime(2023, 1, 1), catchup=False, - dag_id="example_virtualenv", - default_args={"retries": 2}, ) +def example_virtualenv() -> None: + start_task = EmptyOperator(task_id='start-venv-examples') + end_task = EmptyOperator(task_id='end-venv-examples') + + tmp_venv_task_group = DbtTaskGroup( + group_id='tmp-venv-group', + # dbt/cosmos-specific parameters + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.VIRTUALENV, + # We can enable this flag if we want Airflow to create one virtualenv + # and reuse that within the whole DAG. + # virtualenv_dir=f"{get_airflow_home()}/persistent-venv", + ), + operator_args={ + "py_system_site_packages": False, + "py_requirements": ["dbt-postgres==1.6.0b1"], + "install_deps": True, + }, + ) + + cached_venv_task_group = DbtTaskGroup( + group_id='cached-venv-group', + # dbt/cosmos-specific parameters + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.VIRTUALENV, + # We can enable this flag if we want Airflow to create one virtualenv + # and reuse that within the whole DAG. + virtualenv_dir=Path(f"{get_airflow_home()}/persistent-venv"), + ), + operator_args={ + "py_system_site_packages": False, + "py_requirements": ["dbt-postgres==1.6.0b1"], + "install_deps": True, + }, + ) + + start_task >> [tmp_venv_task_group, cached_venv_task_group] >> end_task + +example_virtualenv() # [END virtualenv_example] + diff --git a/tests/test_converter.py b/tests/test_converter.py index 823eff460..5e81aaccf 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -81,7 +81,8 @@ def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, op @patch("cosmos.converter.DbtGraph.load") def test_converter_raises_warning(mock_load_dbt_graph, execution_mode, virtualenv_dir, operator_args, caplog): """ - This test will raise a warning if we are trying to pass ExecutionMode != `VirtualEnv` andm still pass a defined `virtualenv_dir` + This test will raise a warning if we are trying to pass ExecutionMode != `VirtualEnv` + and still pass a defined `virtualenv_dir` """ project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) execution_config = ExecutionConfig(execution_mode=execution_mode, virtualenv_dir=virtualenv_dir)