Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pre-commit hook for McCabe max complexity check and fix errors #629

Merged
Merged
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ repos:
name: mypy-python
additional_dependencies: [types-PyYAML, types-attrs, attrs, types-requests, types-python-dateutil, apache-airflow]
files: ^cosmos
- repo: https://github.com/pycqa/flake8
rev: 6.1.0
hooks:
- id: flake8
entry: pflake8
additional_dependencies: [pyproject-flake8]

ci:
autofix_commit_msg: 🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
Expand Down
208 changes: 106 additions & 102 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,64 @@
has_test: bool = False


def create_symlinks(dbt_project_path: Path, tmp_dir: Path) -> None:
"""Helper function to create symlinks to the dbt project files."""
ignore_paths = (DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "dbt_packages", "profiles.yml")
for child_name in os.listdir(dbt_project_path):
if child_name not in ignore_paths:
os.symlink(dbt_project_path / child_name, tmp_dir / child_name)


def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str:
"""Run a command in a subprocess, returning the stdout."""
logger.info("Running command: `%s`", " ".join(command))
logger.info("Environment variable keys: %s", env_vars.keys())
process = Popen(
command,
stdout=PIPE,
stderr=PIPE,
cwd=tmp_dir,
universal_newlines=True,
env=env_vars,
)
stdout, stderr = process.communicate()
returncode = process.returncode

if 'Run "dbt deps" to install package dependencies' in stdout and command[1] == "ls":
raise CosmosLoadDbtException(
"Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True."
)

if returncode or "Error" in stdout:
details = stderr or stdout
raise CosmosLoadDbtException(f"Unable to run {command} due to the error:\n{details}")

return stdout


def parse_dbt_ls_output(dbt_project_path: Path, ls_stdout: str) -> dict[str, DbtNode]:
"""Parses the output of `dbt ls` into a dictionary of `DbtNode` instances."""
nodes = {}
for line in ls_stdout.split("\n"):
try:
node_dict = json.loads(line.strip())
except json.decoder.JSONDecodeError:
logger.debug("Skipped dbt ls line: %s", line)
else:
node = DbtNode(
name=node_dict.get("alias", node_dict["name"]),
unique_id=node_dict["unique_id"],
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=dbt_project_path / node_dict["original_file_path"],
tags=node_dict["tags"],
config=node_dict["config"],
)
nodes[node.unique_id] = node
logger.debug("Parsed dbt resource `%s` of type `%s`", node.unique_id, node.resource_type)
return nodes


class DbtGraph:
"""
A dbt project graph (represented by `nodes` and `filtered_nodes`).
Expand Down Expand Up @@ -131,6 +189,31 @@
else:
load_method[method]()

def run_dbt_ls(self, dbt_project_path: Path, tmp_dir: Path, env_vars: dict[str, str]) -> dict[str, DbtNode]:
"""Runs dbt ls command and returns the parsed nodes."""
ls_command = [self.dbt_cmd, "ls", "--output", "json"]

if self.exclude:
ls_command.extend(["--exclude", *self.exclude])

if self.select:
ls_command.extend(["--select", *self.select])

ls_command.extend(self.local_flags)

stdout = run_command(ls_command, tmp_dir, env_vars)

logger.debug("dbt ls output: %s", stdout)
log_filepath = self.log_dir / DBT_LOG_FILENAME
logger.debug("dbt logs available in: %s", log_filepath)
if log_filepath.exists():
with open(log_filepath) as logfile:
for line in logfile:
logger.debug(line.strip())

nodes = parse_dbt_ls_output(dbt_project_path, stdout)
return nodes

def load_via_dbt_ls(self) -> None:
"""
This is the most accurate way of loading `dbt` projects and filtering them out, since it uses the `dbt` command
Expand All @@ -148,35 +231,30 @@
self.project.project_name,
self.project.dbt_project_path,
)
if self.project.dbt_project_path is None:
raise CosmosLoadDbtException("Unable to dbt ls load a project without a project path.")

Check warning on line 235 in cosmos/dbt/graph.py

View check run for this annotation

Codecov / codecov/patch

cosmos/dbt/graph.py#L235

Added line #L235 was not covered by tests
tatiana marked this conversation as resolved.
Show resolved Hide resolved

if not self.project.dbt_project_path or not self.profile_config:
raise CosmosLoadDbtException("Unable to load dbt project without project files and a profile config")

if not shutil.which(self.dbt_cmd):
raise CosmosLoadDbtException(f"Unable to find the dbt executable: {self.dbt_cmd}")

with self.profile_config.ensure_profile(use_mock_values=True) as profile_values:
(profile_path, env_vars) = profile_values
env = os.environ.copy()
env.update(env_vars)
with tempfile.TemporaryDirectory() as tmpdir:
logger.info(
"Content of the dbt project dir <%s>: `%s`",
self.project.dbt_project_path,
os.listdir(self.project.dbt_project_path),
)
tmpdir_path = Path(tmpdir)
create_symlinks(self.project.dbt_project_path, tmpdir_path)

with tempfile.TemporaryDirectory() as tmpdir:
logger.info(
"Content of the dbt project dir <%s>: `%s`",
self.project.dbt_project_path,
os.listdir(self.project.dbt_project_path),
)
logger.info("Creating symlinks from %s to `%s`", self.project.dbt_project_path, tmpdir)
# We create symbolic links to the original directory files and directories.
# This allows us to run the dbt command from within the temporary directory, outputting any necessary
# artifact and also allow us to run `dbt deps`
tmpdir_path = Path(tmpdir)
ignore_paths = (DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "dbt_packages", "profiles.yml")
for child_name in os.listdir(self.project.dbt_project_path):
if child_name not in ignore_paths:
os.symlink(self.project.dbt_project_path / child_name, tmpdir_path / child_name)

local_flags = [
with self.profile_config.ensure_profile(use_mock_values=True) as profile_values:
(profile_path, env_vars) = profile_values
env = os.environ.copy()
env.update(env_vars)

self.local_flags = [
"--project-dir",
str(tmpdir),
"--profiles-dir",
Expand All @@ -186,92 +264,18 @@
"--target",
self.profile_config.target_name,
]
log_dir = Path(env.get(DBT_LOG_PATH_ENVVAR) or tmpdir_path / DBT_LOG_DIR_NAME)
target_dir = Path(env.get(DBT_TARGET_PATH_ENVVAR) or tmpdir_path / DBT_TARGET_DIR_NAME)
env[DBT_LOG_PATH_ENVVAR] = str(log_dir)
env[DBT_TARGET_PATH_ENVVAR] = str(target_dir)
self.log_dir = Path(env.get(DBT_LOG_PATH_ENVVAR) or tmpdir_path / DBT_LOG_DIR_NAME)
self.target_dir = Path(env.get(DBT_TARGET_PATH_ENVVAR) or tmpdir_path / DBT_TARGET_DIR_NAME)
env[DBT_LOG_PATH_ENVVAR] = str(self.log_dir)
env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir)

if self.dbt_deps:
deps_command = [self.dbt_cmd, "deps"]
deps_command.extend(local_flags)
logger.info("Running command: `%s`", " ".join(deps_command))
logger.info("Environment variable keys: %s", env.keys())
process = Popen(
deps_command,
stdout=PIPE,
stderr=PIPE,
cwd=tmpdir,
universal_newlines=True,
env=env,
)
stdout, stderr = process.communicate()
returncode = process.returncode
deps_command.extend(self.local_flags)
stdout = run_command(deps_command, tmpdir_path, env)
logger.debug("dbt deps output: %s", stdout)

if returncode or "Error" in stdout:
details = stderr or stdout
raise CosmosLoadDbtException(f"Unable to run dbt deps command due to the error:\n{details}")

ls_command = [self.dbt_cmd, "ls", "--output", "json"]

if self.exclude:
ls_command.extend(["--exclude", *self.exclude])

if self.select:
ls_command.extend(["--select", *self.select])

ls_command.extend(local_flags)

logger.info("Running command: `%s`", " ".join(ls_command))
logger.info("Environment variable keys: %s", env.keys())

process = Popen(
ls_command,
stdout=PIPE,
stderr=PIPE,
cwd=tmpdir,
universal_newlines=True,
env=env,
)

stdout, stderr = process.communicate()
returncode = process.returncode

logger.debug("dbt output: %s", stdout)
log_filepath = log_dir / DBT_LOG_FILENAME
logger.debug("dbt logs available in: %s", log_filepath)
if log_filepath.exists():
with open(log_filepath) as logfile:
for line in logfile:
logger.debug(line.strip())

if 'Run "dbt deps" to install package dependencies' in stdout:
raise CosmosLoadDbtException(
"Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True."
)

if returncode or "Error" in stdout:
details = stderr or stdout
raise CosmosLoadDbtException(f"Unable to run dbt ls command due to the error:\n{details}")

nodes = {}
for line in stdout.split("\n"):
try:
node_dict = json.loads(line.strip())
except json.decoder.JSONDecodeError:
logger.debug("Skipped dbt ls line: %s", line)
else:
node = DbtNode(
name=node_dict.get("alias", node_dict["name"]),
unique_id=node_dict["unique_id"],
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=self.project.dbt_project_path / node_dict["original_file_path"],
tags=node_dict["tags"],
config=node_dict["config"],
)
nodes[node.unique_id] = node
logger.debug("Parsed dbt resource `%s` of type `%s`", node.unique_id, node.resource_type)
nodes = self.run_dbt_ls(self.project.dbt_project_path, tmpdir_path, env)

self.nodes = nodes
self.filtered_nodes = nodes
Expand Down
Loading
Loading