Skip to content

Commit

Permalink
Merge branch 'main' into add-security-policy
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana authored Dec 17, 2024
2 parents 91fe4f5 + 0000f80 commit a997814
Show file tree
Hide file tree
Showing 18 changed files with 671 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ repos:
- --py37-plus
- --keep-runtime-typing
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.8.2
rev: v0.8.3
hooks:
- id: ruff
args:
Expand Down
4 changes: 4 additions & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def create_test_task_metadata(
extra_context = {}

task_owner = ""
airflow_task_config = {}
if test_indirect_selection != TestIndirectSelection.EAGER:
task_args["indirect_selection"] = test_indirect_selection.value
if node is not None:
Expand All @@ -111,6 +112,7 @@ def create_test_task_metadata(

extra_context = {"dbt_node_config": node.context_dict}
task_owner = node.owner
airflow_task_config = node.airflow_task_config

elif render_config is not None: # TestBehavior.AFTER_ALL
task_args["select"] = render_config.select
Expand All @@ -120,6 +122,7 @@ def create_test_task_metadata(
return TaskMetadata(
id=test_task_name,
owner=task_owner,
airflow_task_config=airflow_task_config,
operator_class=calculate_operator_class(
execution_mode=execution_mode,
dbt_class="DbtTest",
Expand Down Expand Up @@ -214,6 +217,7 @@ def create_task_metadata(
task_metadata = TaskMetadata(
id=task_id,
owner=node.owner,
airflow_task_config=node.airflow_task_config,
operator_class=calculate_operator_class(
execution_mode=execution_mode, dbt_class=dbt_resource_to_class[node.resource_type]
),
Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
DEFAULT_DBT_PROFILE_NAME = "cosmos_profile"
DEFAULT_DBT_TARGET_NAME = "cosmos_target"
DEFAULT_COSMOS_CACHE_DIR_NAME = "cosmos"
DEFAULT_TARGET_PATH = "target"
DBT_LOG_PATH_ENVVAR = "DBT_LOG_PATH"
DBT_LOG_DIR_NAME = "logs"
DBT_TARGET_PATH_ENVVAR = "DBT_TARGET_PATH"
Expand Down
3 changes: 3 additions & 0 deletions cosmos/core/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ def get_airflow_task(task: Task, dag: DAG, task_group: TaskGroup | None = None)
if task.owner != "":
task_kwargs["owner"] = task.owner

for k, v in task.airflow_task_config.items():
task_kwargs[k] = v

airflow_task = Operator(
task_id=task.id,
dag=dag,
Expand Down
1 change: 1 addition & 0 deletions cosmos/core/graph/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class Task(CosmosEntity):
"""

owner: str = ""
airflow_task_config: Dict[str, Any] = field(default_factory=dict)
operator_class: str = "airflow.operators.empty.EmptyOperator"
arguments: Dict[str, Any] = field(default_factory=dict)
extra_context: Dict[str, Any] = field(default_factory=dict)
30 changes: 29 additions & 1 deletion cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from functools import cached_property
from pathlib import Path
from subprocess import PIPE, Popen
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Any, Dict, Optional

from airflow.models import Variable

Expand Down Expand Up @@ -67,6 +67,33 @@ class DbtNode:
has_freshness: bool = False
has_test: bool = False

@property
def airflow_task_config(self) -> Dict[str, Any]:
"""
This method is designed to extend the dbt project's functionality by incorporating Airflow-related metadata into the dbt YAML configuration.
Since dbt projects are independent of Airflow, adding Airflow-specific information to the `meta` field within the dbt YAML allows Airflow tasks to
utilize this information during execution.
Examples: pool, pool_slots, queue, ...
Returns:
Dict[str, Any]: A dictionary containing custom metadata configurations for integration with Airflow.
"""

if "meta" in self.config:
meta = self.config["meta"]
if "cosmos" in meta:
cosmos = meta["cosmos"]
if isinstance(cosmos, dict):
if "operator_kwargs" in cosmos:
operator_kwargs = cosmos["operator_kwargs"]
if isinstance(operator_kwargs, dict):
return operator_kwargs
else:
logger.error(f"Invalid type: 'operator_kwargs' in meta.cosmos must be a dict.")
else:
logger.error(f"Invalid type: 'cosmos' in meta must be a dict.")
return {}

@property
def resource_name(self) -> str:
"""
Expand Down Expand Up @@ -133,6 +160,7 @@ def is_freshness_effective(freshness: Optional[dict[str, Any]]) -> bool:

def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str:
"""Run a command in a subprocess, returning the stdout."""
command = [str(arg) if arg is not None else "<None>" for arg in command]
logger.info("Running command: `%s`", " ".join(command))
logger.debug("Environment variable keys: %s", env_vars.keys())
process = Popen(
Expand Down
217 changes: 217 additions & 0 deletions cosmos/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
from __future__ import annotations

import os
from pathlib import Path
from typing import Any
from urllib.parse import urlparse

from cosmos import settings
from cosmos.constants import DEFAULT_TARGET_PATH, FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP
from cosmos.exceptions import CosmosValueError
from cosmos.settings import remote_target_path, remote_target_path_conn_id


def upload_to_aws_s3(
project_dir: str,
bucket_name: str,
aws_conn_id: str | None = None,
source_subpath: str = DEFAULT_TARGET_PATH,
**kwargs: Any,
) -> None:
"""
Helper function demonstrating how to upload files to AWS S3 that can be used as a callback.
:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
:param bucket_name: Name of the S3 bucket to upload to.
:param aws_conn_id: AWS connection ID to use when uploading files.
:param source_subpath: Path of the source directory sub-path to upload files from.
"""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

target_dir = f"{project_dir}/{source_subpath}"
aws_conn_id = aws_conn_id if aws_conn_id else S3Hook.default_conn_name
hook = S3Hook(aws_conn_id=aws_conn_id)
context = kwargs["context"]

# Iterate over the files in the target dir and upload them to S3
for dirpath, _, filenames in os.walk(target_dir):
for filename in filenames:
s3_key = (
f"{context['dag'].dag_id}"
f"/{context['run_id']}"
f"/{context['task_instance'].task_id}"
f"/{context['task_instance']._try_number}"
f"{dirpath.split(project_dir)[-1]}/{filename}"
)
hook.load_file(
filename=f"{dirpath}/{filename}",
bucket_name=bucket_name,
key=s3_key,
replace=True,
)


def upload_to_gcp_gs(
project_dir: str,
bucket_name: str,
gcp_conn_id: str | None = None,
source_subpath: str = DEFAULT_TARGET_PATH,
**kwargs: Any,
) -> None:
"""
Helper function demonstrating how to upload files to GCP GS that can be used as a callback.
:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
:param bucket_name: Name of the GCP GS bucket to upload to.
:param gcp_conn_id: GCP connection ID to use when uploading files.
:param source_subpath: Path of the source directory sub-path to upload files from.
"""
from airflow.providers.google.cloud.hooks.gcs import GCSHook

target_dir = f"{project_dir}/{source_subpath}"
gcp_conn_id = gcp_conn_id if gcp_conn_id else GCSHook.default_conn_name
# bucket_name = kwargs["bucket_name"]
hook = GCSHook(gcp_conn_id=gcp_conn_id)
context = kwargs["context"]

# Iterate over the files in the target dir and upload them to GCP GS
for dirpath, _, filenames in os.walk(target_dir):
for filename in filenames:
object_name = (
f"{context['dag'].dag_id}"
f"/{context['run_id']}"
f"/{context['task_instance'].task_id}"
f"/{context['task_instance']._try_number}"
f"{dirpath.split(project_dir)[-1]}/{filename}"
)
hook.upload(
filename=f"{dirpath}/{filename}",
bucket_name=bucket_name,
object_name=object_name,
)


def upload_to_azure_wasb(
project_dir: str,
container_name: str,
azure_conn_id: str | None = None,
source_subpath: str = DEFAULT_TARGET_PATH,
**kwargs: Any,
) -> None:
"""
Helper function demonstrating how to upload files to Azure WASB that can be used as a callback.
:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
:param container_name: Name of the Azure WASB container to upload files to.
:param azure_conn_id: Azure connection ID to use when uploading files.
:param source_subpath: Path of the source directory sub-path to upload files from.
"""
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook

target_dir = f"{project_dir}/{source_subpath}"
azure_conn_id = azure_conn_id if azure_conn_id else WasbHook.default_conn_name
# container_name = kwargs["container_name"]
hook = WasbHook(wasb_conn_id=azure_conn_id)
context = kwargs["context"]

# Iterate over the files in the target dir and upload them to WASB container
for dirpath, _, filenames in os.walk(target_dir):
for filename in filenames:
blob_name = (
f"{context['dag'].dag_id}"
f"/{context['run_id']}"
f"/{context['task_instance'].task_id}"
f"/{context['task_instance']._try_number}"
f"{dirpath.split(project_dir)[-1]}/{filename}"
)
hook.load_file(
file_path=f"{dirpath}/{filename}",
container_name=container_name,
blob_name=blob_name,
overwrite=True,
)


def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]:
"""Configure the remote target path if it is provided."""
from airflow.version import version as airflow_version

if not remote_target_path:
return None, None

_configured_target_path = None

target_path_str = str(remote_target_path)

remote_conn_id = remote_target_path_conn_id
if not remote_conn_id:
target_path_schema = urlparse(target_path_str).scheme
remote_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(target_path_schema, None) # type: ignore[assignment]
if remote_conn_id is None:
return None, None

if not settings.AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"You're trying to specify remote target path {target_path_str}, but the required "
f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
"Airflow 2.8 or later."
)

from airflow.io.path import ObjectStoragePath

_configured_target_path = ObjectStoragePath(target_path_str, conn_id=remote_conn_id)

if not _configured_target_path.exists(): # type: ignore[no-untyped-call]
_configured_target_path.mkdir(parents=True, exist_ok=True)

return _configured_target_path, remote_conn_id


def _construct_dest_file_path(
dest_target_dir: Path,
file_path: str,
source_target_dir: Path,
source_subpath: str,
**kwargs: Any,
) -> str:
"""
Construct the destination path for the artifact files to be uploaded to the remote store.
"""
dest_target_dir_str = str(dest_target_dir).rstrip("/")

context = kwargs["context"]
task_run_identifier = (
f"{context['dag'].dag_id}"
f"/{context['run_id']}"
f"/{context['task_instance'].task_id}"
f"/{context['task_instance']._try_number}"
)
rel_path = os.path.relpath(file_path, source_target_dir).lstrip("/")

return f"{dest_target_dir_str}/{task_run_identifier}/{source_subpath}/{rel_path}"


def upload_to_cloud_storage(project_dir: str, source_subpath: str = DEFAULT_TARGET_PATH, **kwargs: Any) -> None:
"""
Helper function demonstrating how to upload files to remote object stores that can be used as a callback. This is
an example of a helper function that can be used if on Airflow >= 2.8 and cosmos configurations like
``remote_target_path`` and ``remote_target_path_conn_id`` when set can be leveraged.
:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
:param source_subpath: Path of the source directory sub-path to upload files from.
"""
dest_target_dir, dest_conn_id = _configure_remote_target_path()

if not dest_target_dir:
raise CosmosValueError("You're trying to upload artifact files, but the remote target path is not configured.")

from airflow.io.path import ObjectStoragePath

source_target_dir = Path(project_dir) / f"{source_subpath}"
files = [str(file) for file in source_target_dir.rglob("*") if file.is_file()]
for file_path in files:
dest_file_path = _construct_dest_file_path(
dest_target_dir, file_path, source_target_dir, source_subpath, **kwargs
)
dest_object_storage_path = ObjectStoragePath(dest_file_path, conn_id=dest_conn_id)
ObjectStoragePath(file_path).copy(dest_object_storage_path)
7 changes: 5 additions & 2 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def __init__(
invocation_mode: InvocationMode | None = None,
install_deps: bool = False,
callback: Callable[[str], None] | None = None,
callback_args: dict[str, Any] | None = None,
should_store_compiled_sql: bool = True,
should_upload_compiled_sql: bool = False,
append_env: bool = True,
Expand All @@ -149,6 +150,7 @@ def __init__(
self.task_id = task_id
self.profile_config = profile_config
self.callback = callback
self.callback_args = callback_args or {}
self.compiled_sql = ""
self.freshness = ""
self.should_store_compiled_sql = should_store_compiled_sql
Expand Down Expand Up @@ -500,9 +502,10 @@ def run_command(
self.store_freshness_json(tmp_project_dir, context)
self.store_compiled_sql(tmp_project_dir, context)
self.upload_compiled_sql(tmp_project_dir, context)
self.handle_exception(result)
if self.callback:
self.callback(tmp_project_dir)
self.callback_args.update({"context": context})
self.callback(tmp_project_dir, **self.callback_args)
self.handle_exception(result)

return result

Expand Down
Loading

0 comments on commit a997814

Please sign in to comment.