Skip to content

Commit

Permalink
Merge branch 'main' into vertica-profile
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana authored Sep 27, 2023
2 parents 033f204 + 4c8d6b0 commit 08837b3
Show file tree
Hide file tree
Showing 25 changed files with 403 additions and 113 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ repos:
- id: remove-tabs
exclude: ^docs/make.bat$|^docs/Makefile$|^dev/dags/dbt/jaffle_shop/seeds/raw_orders.csv$
- repo: https://github.com/asottile/pyupgrade
rev: v3.10.1
rev: v3.13.0
hooks:
- id: pyupgrade
args:
- --py37-plus
- --keep-runtime-typing
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.0.288
rev: v0.0.291
hooks:
- id: ruff
args:
Expand Down
11 changes: 7 additions & 4 deletions cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@

from cosmos.airflow.dag import DbtDag
from cosmos.airflow.task_group import DbtTaskGroup
from cosmos.constants import LoadMode, TestBehavior, ExecutionMode
from cosmos.operators.lazy_load import MissingPackage
from cosmos.config import (
ProjectConfig,
ProfileConfig,
ExecutionConfig,
RenderConfig,
)

from cosmos.constants import LoadMode, TestBehavior, ExecutionMode
from cosmos.log import get_logger
from cosmos.operators.lazy_load import MissingPackage
from cosmos.operators.local import (
DbtDepsLocalOperator,
DbtLSLocalOperator,
Expand All @@ -28,6 +28,8 @@
DbtTestLocalOperator,
)

logger = get_logger()

try:
from cosmos.operators.docker import (
DbtLSDockerOperator,
Expand Down Expand Up @@ -57,7 +59,8 @@
DbtSnapshotKubernetesOperator,
DbtTestKubernetesOperator,
)
except ImportError:
except ImportError as error:
logger.exception(error)
DbtLSKubernetesOperator = MissingPackage(
"cosmos.operators.kubernetes.DbtLSKubernetesOperator",
"kubernetes",
Expand Down
21 changes: 12 additions & 9 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def create_test_task_metadata(


def create_task_metadata(
node: DbtNode, execution_mode: ExecutionMode, args: dict[str, Any], use_name_as_task_id_prefix: bool = True
node: DbtNode, execution_mode: ExecutionMode, args: dict[str, Any], use_task_group: bool = False
) -> TaskMetadata | None:
"""
Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node.
Expand All @@ -106,9 +106,9 @@ def create_task_metadata(

if hasattr(node.resource_type, "value") and node.resource_type in dbt_resource_to_class:
if node.resource_type == DbtResourceType.MODEL:
if use_name_as_task_id_prefix:
task_id = f"{node.name}_run"
else:
task_id = f"{node.name}_run"

if use_task_group is True:
task_id = "run"
else:
task_id = f"{node.name}_{node.resource_type.value}"
Expand Down Expand Up @@ -167,14 +167,17 @@ def build_airflow_graph(
# The exception are the test nodes, since it would be too slow to run test tasks individually.
# If test_behaviour=="after_each", each model task will be bundled with a test task, using TaskGroup
for node_id, node in nodes.items():
use_task_group = (
node.resource_type == DbtResourceType.MODEL
and test_behavior == TestBehavior.AFTER_EACH
and node.has_test is True
)
task_meta = create_task_metadata(
node=node,
execution_mode=execution_mode,
args=task_args,
use_name_as_task_id_prefix=test_behavior != TestBehavior.AFTER_EACH,
node=node, execution_mode=execution_mode, args=task_args, use_task_group=use_task_group
)

if task_meta and node.resource_type != DbtResourceType.TEST:
if node.resource_type == DbtResourceType.MODEL and test_behavior == TestBehavior.AFTER_EACH:
if use_task_group is True:
with TaskGroup(dag=dag, group_id=node.name, parent_group=task_group) as model_task_group:
task = create_airflow_task(task_meta, dag, task_group=model_task_group)
test_meta = create_test_task_metadata(
Expand Down
2 changes: 1 addition & 1 deletion cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def validate_project(self) -> None:
}
for name, path in mandatory_paths.items():
if path is None or not Path(path).exists():
raise CosmosValueError(f"Could not find {name} at {project_yml_path}")
raise CosmosValueError(f"Could not find {name} at {path}")

def is_manifest_available(self) -> bool:
"""
Expand Down
1 change: 0 additions & 1 deletion cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ def __init__(
"profile_config": profile_config,
"emit_datasets": emit_datasets,
}

if dbt_executable_path:
task_args["dbt_executable_path"] = dbt_executable_path

Expand Down
42 changes: 33 additions & 9 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class DbtNode:
file_path: Path
tags: list[str] = field(default_factory=lambda: [])
config: dict[str, Any] = field(default_factory=lambda: {})
has_test: bool = False


class DbtGraph:
Expand Down Expand Up @@ -193,9 +194,10 @@ def load_via_dbt_ls(self) -> None:
env=env,
)
stdout, stderr = process.communicate()
returncode = process.returncode
logger.debug("dbt deps output: %s", stdout)

if stderr or "Error" in stdout:
if returncode or "Error" in stdout:
details = stderr or stdout
raise CosmosLoadDbtException(f"Unable to run dbt deps command due to the error:\n{details}")

Expand All @@ -222,6 +224,7 @@ def load_via_dbt_ls(self) -> None:
)

stdout, stderr = process.communicate()
returncode = process.returncode

logger.debug("dbt output: %s", stdout)
log_filepath = log_dir / DBT_LOG_FILENAME
Expand All @@ -231,14 +234,14 @@ def load_via_dbt_ls(self) -> None:
for line in logfile:
logger.debug(line.strip())

if stderr or "Error" in stdout:
if 'Run "dbt deps" to install package dependencies' in stdout:
raise CosmosLoadDbtException(
"Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True."
)
else:
details = stderr or stdout
raise CosmosLoadDbtException(f"Unable to run dbt ls command due to the error:\n{details}")
if 'Run "dbt deps" to install package dependencies' in stdout:
raise CosmosLoadDbtException(
"Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True."
)

if returncode or "Error" in stdout:
details = stderr or stdout
raise CosmosLoadDbtException(f"Unable to run dbt ls command due to the error:\n{details}")

nodes = {}
for line in stdout.split("\n"):
Expand All @@ -262,6 +265,8 @@ def load_via_dbt_ls(self) -> None:
self.nodes = nodes
self.filtered_nodes = nodes

self.update_node_dependency()

logger.info("Total nodes: %i", len(self.nodes))
logger.info("Total filtered nodes: %i", len(self.nodes))

Expand Down Expand Up @@ -306,6 +311,8 @@ def load_via_custom_parser(self) -> None:
project_dir=self.project.dir, nodes=nodes, select=self.select, exclude=self.exclude
)

self.update_node_dependency()

logger.info("Total nodes: %i", len(self.nodes))
logger.info("Total filtered nodes: %i", len(self.nodes))

Expand Down Expand Up @@ -335,11 +342,28 @@ def load_from_dbt_manifest(self) -> None:
tags=node_dict["tags"],
config=node_dict["config"],
)

nodes[node.unique_id] = node

self.nodes = nodes
self.filtered_nodes = select_nodes(
project_dir=self.project.dir, nodes=nodes, select=self.select, exclude=self.exclude
)

self.update_node_dependency()

logger.info("Total nodes: %i", len(self.nodes))
logger.info("Total filtered nodes: %i", len(self.nodes))

def update_node_dependency(self) -> None:
"""
This will update the property `has_text` if node has `dbt` test
Updates in-place:
* self.filtered_nodes
"""
for _, node in self.filtered_nodes.items():
if node.resource_type == DbtResourceType.TEST:
for node_id in node.depends_on:
if node_id in self.filtered_nodes:
self.filtered_nodes[node_id].has_test = True
3 changes: 3 additions & 0 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class DbtBaseOperator(BaseOperator):
:param cache_selected_only:
:param no_version_check: dbt optional argument - If set, skip ensuring dbt's version matches the one specified in
the dbt_project.yml file ('require-dbt-version')
:param emit_datasets: Enable emitting inlets and outlets during task execution
:param fail_fast: dbt optional argument to make dbt exit immediately if a single resource fails to build.
:param quiet: dbt optional argument to show only error logs in stdout
:param warn_error: dbt optional argument to convert dbt warnings into errors
Expand Down Expand Up @@ -87,6 +88,7 @@ def __init__(
selector: str | None = None,
vars: dict[str, str] | None = None,
models: str | None = None,
emit_datasets: bool = True,
cache_selected_only: bool = False,
no_version_check: bool = False,
fail_fast: bool = False,
Expand All @@ -112,6 +114,7 @@ def __init__(
self.selector = selector
self.vars = vars
self.models = models
self.emit_datasets = emit_datasets
self.cache_selected_only = cache_selected_only
self.no_version_check = no_version_check
self.fail_fast = fail_fast
Expand Down
18 changes: 13 additions & 5 deletions cosmos/operators/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@

logger = get_logger(__name__)

# kubernetes is an optional dependency, so we need to check if it's installed
try:
# apache-airflow-providers-cncf-kubernetes >= 7.4.0
from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters import (
convert_env_vars,
)
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
except ImportError:
raise ImportError(
"Could not import KubernetesPodOperator. Ensure you've installed the Kubernetes provider "
"separately or with with `pip install astronomer-cosmos[...,kubernetes]`."
)
try:
# apache-airflow-providers-cncf-kubernetes < 7.4.0
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
except ImportError as error:
logger.exception(error)
raise ImportError(
"Could not import KubernetesPodOperator. Ensure you've installed the Kubernetes provider "
"separately or with with `pip install astronomer-cosmos[...,kubernetes]`."
)


class DbtKubernetesBaseOperator(KubernetesPodOperator, DbtBaseOperator): # type: ignore
Expand Down Expand Up @@ -70,6 +75,9 @@ def build_kube_args(self, context: Context, cmd_flags: list[str] | None = None)
if self.profile_config.target_name:
dbt_cmd.extend(["--target", self.profile_config.target_name])

if self.project_dir:
dbt_cmd.extend(["--project-dir", str(self.project_dir)])

# set env vars
self.build_env_args(env_vars)
self.arguments = dbt_cmd
Expand Down
33 changes: 12 additions & 21 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@
FullOutputSubprocessHook,
FullOutputSubprocessResult,
)
from cosmos.dbt.parser.output import (
extract_log_issues,
)
from cosmos.dbt.parser.output import extract_log_issues, parse_output

logger = get_logger(__name__)

Expand Down Expand Up @@ -82,6 +80,7 @@ class DbtLocalBaseOperator(DbtBaseOperator):
:param profile_name: A name to use for the dbt profile. If not provided, and no profile target is found
in your project's dbt_project.yml, "cosmos_profile" is used.
:param install_deps: If true, install dependencies before running the command
:param install_deps: If true, the operator will set inlets and outlets
:param callback: A callback function called on after a dbt run with a path to the dbt project directory.
:param target_name: A name to use for the dbt target. If not provided, and no target is found
in your project's dbt_project.yml, "cosmos_target" is used.
Expand All @@ -99,15 +98,13 @@ def __init__(
install_deps: bool = False,
callback: Callable[[str], None] | None = None,
should_store_compiled_sql: bool = True,
emit_datasets: bool = True,
**kwargs: Any,
) -> None:
self.profile_config = profile_config
self.install_deps = install_deps
self.callback = callback
self.compiled_sql = ""
self.should_store_compiled_sql = should_store_compiled_sql
self.emit_datasets = emit_datasets
self.openlineage_events_completes: list[RunEvent] = []
super().__init__(**kwargs)

Expand Down Expand Up @@ -277,7 +274,7 @@ def calculate_openlineage_events_completes(
try:
events = openlineage_processor.parse()
self.openlineage_events_completes = events.completes
except (FileNotFoundError, NotImplementedError):
except (FileNotFoundError, NotImplementedError, ValueError):
logger.debug("Unable to parse OpenLineage events", stack_info=True)

def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]:
Expand Down Expand Up @@ -350,11 +347,12 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope
job_facets=job_facets,
)

def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None:
def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> FullOutputSubprocessResult:
dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags)
dbt_cmd = dbt_cmd or []
result = self.run_command(cmd=dbt_cmd, env=env, context=context)
logger.info(result.output)
return result

def execute(self, context: Context) -> None:
self.build_and_run_cmd(context=context)
Expand Down Expand Up @@ -461,20 +459,6 @@ def __init__(
self.base_cmd = ["test"]
self.on_warning_callback = on_warning_callback

def _should_run_tests(
self,
result: FullOutputSubprocessResult,
no_tests_message: str = "Nothing to do",
) -> bool:
"""
Check if any tests are defined to run in the DAG. If tests are defined
and on_warning_callback is set, then function returns True.
:param result: The output from the build and run command.
"""

return self.on_warning_callback is not None and no_tests_message not in result.output

def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context) -> None:
"""
Handles warnings by extracting log issues, creating additional context, and calling the
Expand All @@ -492,6 +476,13 @@ def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context)
if self.on_warning_callback:
self.on_warning_callback(warning_context)

def execute(self, context: Context) -> None:
result = self.build_and_run_cmd(context=context)
if self.on_warning_callback and "WARN" in result.output:
warnings = parse_output(result, "WARN")
if warnings > 0:
self._handle_warnings(result, context)


class DbtRunOperationLocalOperator(DbtLocalBaseOperator):
"""
Expand Down
Binary file modified docs/_static/jaffle_shop_k8s_dag_run.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 08837b3

Please sign in to comment.