Skip to content

Commit

Permalink
Merge pull request #256 from godatadriven/alphabetical-checks
Browse files Browse the repository at this point in the history
Alphabetical checks
  • Loading branch information
pgoslatara authored Sep 24, 2024
2 parents 39add8e + ff11b2e commit b4c8bda
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 233 deletions.
9 changes: 9 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ repos:
name: mypy
require_serial: true
types_or: [python, pyi]
- repo: local
hooks:
- id: alphabetical-checks
args: []
entry: poetry run python ./scripts/assert_alphabetical_checks.py
language: system
name: alphabetical-checks
require_serial: true
types_or: [python, pyi]
- repo: local
hooks:
- id: sqlfmt
Expand Down
30 changes: 30 additions & 0 deletions scripts/assert_alphabetical_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import ast
import logging
from pathlib import Path

from dbt_bouncer.logger import configure_console_logging


def main():
"""Assert that all checks are alphabetically sorted."""
for f in Path("src/dbt_bouncer/checks").glob("*/*.py"):
logging.info(f"Checking {f.name}...")
with Path.open(f) as file:
node = ast.parse(file.read())

class_names = [
class_.name
for class_ in [n for n in node.body if isinstance(n, ast.ClassDef)]
if class_.name.startswith("Check")
]
logging.debug(f"{class_names=}")
logging.debug(f"{sorted(class_names)=}")
logging.debug(class_names == sorted(class_names))
assert class_names == sorted(
class_names
), f"Class names are not sorted alphabetically in {f.name}"


if __name__ == "__main__":
configure_console_logging(1)
main()
114 changes: 57 additions & 57 deletions src/dbt_bouncer/checks/catalog/check_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,63 @@ def execute(self) -> None:
assert not non_complying_columns, f"`{self.catalog_node.unique_id.split('.')[-1]}` has columns that do not have a populated description: {non_complying_columns}"


class CheckColumnHasSpecifiedTest(BaseCheck):
"""Columns that match the specified regexp pattern must have a specified test.
Parameters:
column_name_pattern (str): Regex pattern to match the column name.
test_name (str): Name of the test to check for.
Receives:
catalog_node (CatalogTable): The CatalogTable object to check.
tests (List[DbtBouncerTestBase]): List of DbtBouncerTestBase objects parsed from `manifest.json`.
Other Parameters:
exclude (Optional[str]): Regex pattern to match the model path. Model paths that match the pattern will not be checked.
include (Optional[str]): Regex pattern to match the model path. Only model paths that match the pattern will be checked.
severity (Optional[Literal["error", "warn"]]): Severity level of the check. Default: `error`.
Example(s):
```yaml
catalog_checks:
- name: check_column_has_specified_test
column_name_pattern: ^is_.*
test_name: not_null
```
"""

catalog_node: "CatalogTable" = Field(default=None)
column_name_pattern: str
name: Literal["check_column_has_specified_test"]
test_name: str
tests: List["DbtBouncerTestBase"] = Field(default=[])

def execute(self) -> None:
"""Execute the check."""
columns_to_check = [
v.name
for _, v in self.catalog_node.columns.items()
if re.compile(self.column_name_pattern.strip()).match(v.name) is not None
]
relevant_tests = [
t
for t in self.tests
if hasattr(t, "test_metadata") is True
and hasattr(t, "attached_node") is True
and t.test_metadata.name == self.test_name
and t.attached_node == self.catalog_node.unique_id
]
non_complying_columns = [
c
for c in columns_to_check
if f"{self.catalog_node.unique_id}.{c}"
not in [f"{t.attached_node}.{t.column_name}" for t in relevant_tests]
]

assert not non_complying_columns, f"`{self.catalog_node.unique_id.split('.')[-1]}` has columns that should have a `{self.test_name}` test: {non_complying_columns}"


class CheckColumnNameCompliesToColumnType(BaseCheck):
"""Columns with specified data types must comply to the specified regexp naming pattern.
Expand Down Expand Up @@ -202,60 +259,3 @@ def execute(self) -> None:
non_complying_columns.append(v.name)

assert not non_complying_columns, f"`{self.catalog_node.unique_id.split('.')[-1]}` is a public model but has columns that don't have a populated description: {non_complying_columns}"


class CheckColumnHasSpecifiedTest(BaseCheck):
"""Columns that match the specified regexp pattern must have a specified test.
Parameters:
column_name_pattern (str): Regex pattern to match the column name.
test_name (str): Name of the test to check for.
Receives:
catalog_node (CatalogTable): The CatalogTable object to check.
tests (List[DbtBouncerTestBase]): List of DbtBouncerTestBase objects parsed from `manifest.json`.
Other Parameters:
exclude (Optional[str]): Regex pattern to match the model path. Model paths that match the pattern will not be checked.
include (Optional[str]): Regex pattern to match the model path. Only model paths that match the pattern will be checked.
severity (Optional[Literal["error", "warn"]]): Severity level of the check. Default: `error`.
Example(s):
```yaml
catalog_checks:
- name: check_column_has_specified_test
column_name_pattern: ^is_.*
test_name: not_null
```
"""

catalog_node: "CatalogTable" = Field(default=None)
column_name_pattern: str
name: Literal["check_column_has_specified_test"]
test_name: str
tests: List["DbtBouncerTestBase"] = Field(default=[])

def execute(self) -> None:
"""Execute the check."""
columns_to_check = [
v.name
for _, v in self.catalog_node.columns.items()
if re.compile(self.column_name_pattern.strip()).match(v.name) is not None
]
relevant_tests = [
t
for t in self.tests
if hasattr(t, "test_metadata") is True
and hasattr(t, "attached_node") is True
and t.test_metadata.name == self.test_name
and t.attached_node == self.catalog_node.unique_id
]
non_complying_columns = [
c
for c in columns_to_check
if f"{self.catalog_node.unique_id}.{c}"
not in [f"{t.attached_node}.{t.column_name}" for t in relevant_tests]
]

assert not non_complying_columns, f"`{self.catalog_node.unique_id.split('.')[-1]}` has columns that should have a `{self.test_name}` test: {non_complying_columns}"
Loading

0 comments on commit b4c8bda

Please sign in to comment.