Skip to content

Commit

Permalink
Merge branch 'main' into add-templating-for-profiles_args
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana authored Nov 2, 2023
2 parents c5894ff + 358f4b0 commit e719253
Show file tree
Hide file tree
Showing 8 changed files with 375 additions and 235 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ jobs:
env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_AIRFLOW_DB: postgres://postgres:[email protected]:5432/postgres
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
AIRFLOW_CONN_DATABRICKS_DEFAULT: ${{ secrets.AIRFLOW_CONN_DATABRICKS_DEFAULT }}
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }}
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
DATABRICKS_WAREHOUSE_ID: ${{ secrets.DATABRICKS_WAREHOUSE_ID }}
Expand Down Expand Up @@ -192,6 +193,7 @@ jobs:
AIRFLOW_CONN_AIRFLOW_DB: postgres://postgres:[email protected]:5432/postgres
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
AIRFLOW_CONN_DATABRICKS_DEFAULT: ${{ secrets.AIRFLOW_CONN_DATABRICKS_DEFAULT }}
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0
DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }}
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
DATABRICKS_WAREHOUSE_ID: ${{ secrets.DATABRICKS_WAREHOUSE_ID }}
Expand Down Expand Up @@ -256,6 +258,7 @@ jobs:
env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_AIRFLOW_DB: postgres://postgres:[email protected]:5432/postgres
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
AIRFLOW_CONN_DATABRICKS_DEFAULT: ${{ secrets.AIRFLOW_CONN_DATABRICKS_DEFAULT }}
DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }}
Expand Down
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ repos:
name: mypy-python
additional_dependencies: [types-PyYAML, types-attrs, attrs, types-requests, types-python-dateutil, apache-airflow]
files: ^cosmos
- repo: https://github.com/pycqa/flake8
rev: 6.1.0
hooks:
- id: flake8
entry: pflake8
additional_dependencies: [pyproject-flake8]

ci:
autofix_commit_msg: 🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
Expand Down
208 changes: 106 additions & 102 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,64 @@ class DbtNode:
has_test: bool = False


def create_symlinks(dbt_project_path: Path, tmp_dir: Path) -> None:
"""Helper function to create symlinks to the dbt project files."""
ignore_paths = (DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "dbt_packages", "profiles.yml")
for child_name in os.listdir(dbt_project_path):
if child_name not in ignore_paths:
os.symlink(dbt_project_path / child_name, tmp_dir / child_name)


def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str:
"""Run a command in a subprocess, returning the stdout."""
logger.info("Running command: `%s`", " ".join(command))
logger.info("Environment variable keys: %s", env_vars.keys())
process = Popen(
command,
stdout=PIPE,
stderr=PIPE,
cwd=tmp_dir,
universal_newlines=True,
env=env_vars,
)
stdout, stderr = process.communicate()
returncode = process.returncode

if 'Run "dbt deps" to install package dependencies' in stdout and command[1] == "ls":
raise CosmosLoadDbtException(
"Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True."
)

if returncode or "Error" in stdout:
details = stderr or stdout
raise CosmosLoadDbtException(f"Unable to run {command} due to the error:\n{details}")

return stdout


def parse_dbt_ls_output(dbt_project_path: Path, ls_stdout: str) -> dict[str, DbtNode]:
"""Parses the output of `dbt ls` into a dictionary of `DbtNode` instances."""
nodes = {}
for line in ls_stdout.split("\n"):
try:
node_dict = json.loads(line.strip())
except json.decoder.JSONDecodeError:
logger.debug("Skipped dbt ls line: %s", line)
else:
node = DbtNode(
name=node_dict.get("alias", node_dict["name"]),
unique_id=node_dict["unique_id"],
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=dbt_project_path / node_dict["original_file_path"],
tags=node_dict["tags"],
config=node_dict["config"],
)
nodes[node.unique_id] = node
logger.debug("Parsed dbt resource `%s` of type `%s`", node.unique_id, node.resource_type)
return nodes


class DbtGraph:
"""
A dbt project graph (represented by `nodes` and `filtered_nodes`).
Expand Down Expand Up @@ -131,6 +189,31 @@ def load(
else:
load_method[method]()

def run_dbt_ls(self, dbt_project_path: Path, tmp_dir: Path, env_vars: dict[str, str]) -> dict[str, DbtNode]:
"""Runs dbt ls command and returns the parsed nodes."""
ls_command = [self.dbt_cmd, "ls", "--output", "json"]

if self.exclude:
ls_command.extend(["--exclude", *self.exclude])

if self.select:
ls_command.extend(["--select", *self.select])

ls_command.extend(self.local_flags)

stdout = run_command(ls_command, tmp_dir, env_vars)

logger.debug("dbt ls output: %s", stdout)
log_filepath = self.log_dir / DBT_LOG_FILENAME
logger.debug("dbt logs available in: %s", log_filepath)
if log_filepath.exists():
with open(log_filepath) as logfile:
for line in logfile:
logger.debug(line.strip())

nodes = parse_dbt_ls_output(dbt_project_path, stdout)
return nodes

def load_via_dbt_ls(self) -> None:
"""
This is the most accurate way of loading `dbt` projects and filtering them out, since it uses the `dbt` command
Expand All @@ -148,35 +231,30 @@ def load_via_dbt_ls(self) -> None:
self.project.project_name,
self.project.dbt_project_path,
)
if self.project.dbt_project_path is None:
raise CosmosLoadDbtException("Unable to dbt ls load a project without a project path.")

if not self.project.dbt_project_path or not self.profile_config:
raise CosmosLoadDbtException("Unable to load dbt project without project files and a profile config")

if not shutil.which(self.dbt_cmd):
raise CosmosLoadDbtException(f"Unable to find the dbt executable: {self.dbt_cmd}")

with self.profile_config.ensure_profile(use_mock_values=True) as profile_values:
(profile_path, env_vars) = profile_values
env = os.environ.copy()
env.update(env_vars)
with tempfile.TemporaryDirectory() as tmpdir:
logger.info(
"Content of the dbt project dir <%s>: `%s`",
self.project.dbt_project_path,
os.listdir(self.project.dbt_project_path),
)
tmpdir_path = Path(tmpdir)
create_symlinks(self.project.dbt_project_path, tmpdir_path)

with tempfile.TemporaryDirectory() as tmpdir:
logger.info(
"Content of the dbt project dir <%s>: `%s`",
self.project.dbt_project_path,
os.listdir(self.project.dbt_project_path),
)
logger.info("Creating symlinks from %s to `%s`", self.project.dbt_project_path, tmpdir)
# We create symbolic links to the original directory files and directories.
# This allows us to run the dbt command from within the temporary directory, outputting any necessary
# artifact and also allow us to run `dbt deps`
tmpdir_path = Path(tmpdir)
ignore_paths = (DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "dbt_packages", "profiles.yml")
for child_name in os.listdir(self.project.dbt_project_path):
if child_name not in ignore_paths:
os.symlink(self.project.dbt_project_path / child_name, tmpdir_path / child_name)

local_flags = [
with self.profile_config.ensure_profile(use_mock_values=True) as profile_values:
(profile_path, env_vars) = profile_values
env = os.environ.copy()
env.update(env_vars)

self.local_flags = [
"--project-dir",
str(tmpdir),
"--profiles-dir",
Expand All @@ -186,92 +264,18 @@ def load_via_dbt_ls(self) -> None:
"--target",
self.profile_config.target_name,
]
log_dir = Path(env.get(DBT_LOG_PATH_ENVVAR) or tmpdir_path / DBT_LOG_DIR_NAME)
target_dir = Path(env.get(DBT_TARGET_PATH_ENVVAR) or tmpdir_path / DBT_TARGET_DIR_NAME)
env[DBT_LOG_PATH_ENVVAR] = str(log_dir)
env[DBT_TARGET_PATH_ENVVAR] = str(target_dir)
self.log_dir = Path(env.get(DBT_LOG_PATH_ENVVAR) or tmpdir_path / DBT_LOG_DIR_NAME)
self.target_dir = Path(env.get(DBT_TARGET_PATH_ENVVAR) or tmpdir_path / DBT_TARGET_DIR_NAME)
env[DBT_LOG_PATH_ENVVAR] = str(self.log_dir)
env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir)

if self.dbt_deps:
deps_command = [self.dbt_cmd, "deps"]
deps_command.extend(local_flags)
logger.info("Running command: `%s`", " ".join(deps_command))
logger.info("Environment variable keys: %s", env.keys())
process = Popen(
deps_command,
stdout=PIPE,
stderr=PIPE,
cwd=tmpdir,
universal_newlines=True,
env=env,
)
stdout, stderr = process.communicate()
returncode = process.returncode
deps_command.extend(self.local_flags)
stdout = run_command(deps_command, tmpdir_path, env)
logger.debug("dbt deps output: %s", stdout)

if returncode or "Error" in stdout:
details = stderr or stdout
raise CosmosLoadDbtException(f"Unable to run dbt deps command due to the error:\n{details}")

ls_command = [self.dbt_cmd, "ls", "--output", "json"]

if self.exclude:
ls_command.extend(["--exclude", *self.exclude])

if self.select:
ls_command.extend(["--select", *self.select])

ls_command.extend(local_flags)

logger.info("Running command: `%s`", " ".join(ls_command))
logger.info("Environment variable keys: %s", env.keys())

process = Popen(
ls_command,
stdout=PIPE,
stderr=PIPE,
cwd=tmpdir,
universal_newlines=True,
env=env,
)

stdout, stderr = process.communicate()
returncode = process.returncode

logger.debug("dbt output: %s", stdout)
log_filepath = log_dir / DBT_LOG_FILENAME
logger.debug("dbt logs available in: %s", log_filepath)
if log_filepath.exists():
with open(log_filepath) as logfile:
for line in logfile:
logger.debug(line.strip())

if 'Run "dbt deps" to install package dependencies' in stdout:
raise CosmosLoadDbtException(
"Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True."
)

if returncode or "Error" in stdout:
details = stderr or stdout
raise CosmosLoadDbtException(f"Unable to run dbt ls command due to the error:\n{details}")

nodes = {}
for line in stdout.split("\n"):
try:
node_dict = json.loads(line.strip())
except json.decoder.JSONDecodeError:
logger.debug("Skipped dbt ls line: %s", line)
else:
node = DbtNode(
name=node_dict.get("alias", node_dict["name"]),
unique_id=node_dict["unique_id"],
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=self.project.dbt_project_path / node_dict["original_file_path"],
tags=node_dict["tags"],
config=node_dict["config"],
)
nodes[node.unique_id] = node
logger.debug("Parsed dbt resource `%s` of type `%s`", node.unique_id, node.resource_type)
nodes = self.run_dbt_ls(self.project.dbt_project_path, tmpdir_path, env)

self.nodes = nodes
self.filtered_nodes = nodes
Expand Down
Loading

0 comments on commit e719253

Please sign in to comment.