diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index adf608643..4888583bb 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -4,6 +4,7 @@ import shutil import signal import tempfile +from attr import define from pathlib import Path from typing import Any, Callable, Literal, Sequence, TYPE_CHECKING @@ -53,12 +54,24 @@ from openlineage.airflow.extractors.base import OperatorLineage except (ImportError, ModuleNotFoundError) as error: logger.warning( - "To enable emitting Openlineage events. In order to use openlineage, upgrade to Airflow 2.7 or " - "install astronomer-cosmos[openlineage]." + "To enable emitting Openlineage events, upgrade to Airflow 2.7 or install astronomer-cosmos[openlineage]." ) logger.exception(error) is_openlineage_available = False + @define + class OperatorLineage: # type: ignore + inputs: list[str] = list() + outputs: list[str] = list() + run_facets: dict[str, str] = dict() + job_facets: dict[str, str] = dict() + + +try: + LINEAGE_NAMESPACE = conf.get("openlineage", "namespace") +except airflow.exceptions.AirflowConfigException: + LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE) + class DbtLocalBaseOperator(DbtBaseOperator): """ @@ -251,15 +264,9 @@ def calculate_openlineage_events_completes( for key, value in env.items(): os.environ[key] = str(value) - lineage_namespace = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE) - try: - lineage_namespace = conf.get("openlineage", "namespace") - except airflow.exceptions.AirflowConfigException: - pass - openlineage_processor = DbtLocalArtifactProcessor( producer=OPENLINEAGE_PRODUCER, - job_namespace=lineage_namespace, + job_namespace=LINEAGE_NAMESPACE, project_dir=project_dir, profile_name=self.profile_config.profile_name, target=self.profile_config.target_name, @@ -270,8 +277,8 @@ def calculate_openlineage_events_completes( try: events = openlineage_processor.parse() self.openlineage_events_completes = events.completes - except (FileNotFoundError, NotImplementedError) as error: - logger.exception(error) + except (FileNotFoundError, NotImplementedError): + logger.debug("Unable to parse OpenLineage events", stack_info=True) def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]: """ @@ -309,17 +316,32 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope """ Collect the input, output, job and run facets for this operator. It relies on the calculate_openlineage_events_completes having being called before. + + This method is called by Openlineage even if `execute` fails, because `get_openlineage_facets_on_failure` + is not implemented. """ + inputs = [] outputs = [] run_facets: dict[str, Any] = {} job_facets: dict[str, Any] = {} - for completed in task_instance.openlineage_events_completes: - [inputs.append(input_) for input_ in completed.inputs if input_ not in inputs] # type: ignore - [outputs.append(output) for output in completed.outputs if output not in outputs] # type: ignore - run_facets = {**run_facets, **completed.run.facets} - job_facets = {**job_facets, **completed.job.facets} + openlineage_events_completes = None + if hasattr(self, "openlineage_events_completes"): + openlineage_events_completes = self.openlineage_events_completes + elif hasattr(task_instance, "openlineage_events_completes"): + openlineage_events_completes = task_instance.openlineage_events_completes + else: + logger.info("Unable to emit OpenLineage events due to lack of data.") + + if openlineage_events_completes is not None: + for completed in openlineage_events_completes: + [inputs.append(input_) for input_ in completed.inputs if input_ not in inputs] # type: ignore + [outputs.append(output) for output in completed.outputs if output not in outputs] # type: ignore + run_facets = {**run_facets, **completed.run.facets} + job_facets = {**job_facets, **completed.job.facets} + else: + logger.info("Unable to emit OpenLineage events due to lack of dependencies or data.") return OperatorLineage( inputs=inputs, diff --git a/pyproject.toml b/pyproject.toml index 81633fdad..dd88d2d15 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,13 +35,13 @@ classifiers = [ ] dependencies = [ # Airflow & Pydantic issue: https://github.com/apache/airflow/issues/32311 + "attrs", "pydantic>=1.10.0,<2.0.0", "apache-airflow>=2.3.0", "importlib-metadata; python_version < '3.8'", "Jinja2>=3.0.0", "typing-extensions; python_version < '3.8'", "virtualenv", - "openlineage-integration-common", ] [project.optional-dependencies] @@ -76,6 +76,7 @@ dbt-spark = [ "dbt-spark<=1.5.4", ] openlineage = [ + "openlineage-integration-common", "openlineage-airflow", ] all = [ @@ -134,7 +135,6 @@ dependencies = [ "apache-airflow-providers-cncf-kubernetes>=5.1.1,<7.3.0", "types-PyYAML", "types-attrs", - "attrs", "types-requests", "types-python-dateutil", "apache-airflow" diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index f926816f6..94b0f8e27 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -211,6 +211,23 @@ class MockEvent: assert facets.job_facets == {"d": 4} +def test_run_operator_emits_events_without_openlineage_events_completes(caplog): + dbt_base_operator = DbtLocalBaseOperator( + profile_config=profile_config, + task_id="my-task", + project_dir="my/dir", + should_store_compiled_sql=False, + ) + delattr(dbt_base_operator, "openlineage_events_completes") + facets = dbt_base_operator.get_openlineage_facets_on_complete(dbt_base_operator) + assert facets.inputs == [] + assert facets.outputs == [] + assert facets.run_facets == {} + assert facets.job_facets == {} + log = "Unable to emit OpenLineage events due to lack of dependencies or data." + assert log in caplog.text + + def test_store_compiled_sql() -> None: dbt_base_operator = DbtLocalBaseOperator( profile_config=profile_config,