Skip to content

Commit

Permalink
Merge pull request #770 from dyvenia/df_tests_sv_tests
Browse files Browse the repository at this point in the history
✨ Added test for column sum+ tests for validate_df
  • Loading branch information
m-paz authored Oct 24, 2023
2 parents 1e2d708 + c3dd445 commit 5379468
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 19 deletions.
100 changes: 98 additions & 2 deletions tests/unit/test_task_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
from typing import List
from unittest import mock

import numpy as np
import pandas as pd
import prefect
import pytest

from viadot.exceptions import ValidationError
from viadot.task_utils import (
add_ingestion_metadata_task,
adls_bulk_upload,
Expand All @@ -21,6 +20,7 @@
dtypes_to_json_task,
union_dfs_task,
write_to_json,
validate_df,
)


Expand Down Expand Up @@ -393,3 +393,99 @@ def test_wrong_method():
)
with pytest.raises(ValueError, match="Method not found"):
anonymize_df.run(data, ["name", "last_name", "email"], method="anonymize")


def test_validate_df_column_size_pass():
df = pd.DataFrame({"col1": ["a", "bb", "ccc"]})
tests = {"column_size": {"col1": 3}}
try:
validate_df.run(df, tests)
except ValidationError:
assert False, "Validation failed but was expected to pass"


def test_validate_df_column_size_fail():
df = pd.DataFrame({"col1": ["a", "bb", "cccc"]})
tests = {"column_size": {"col1": 3}}
with pytest.raises(ValidationError):
validate_df.run(df, tests)


def test_validate_df_column_unique_values_pass():
df = pd.DataFrame({"col1": [1, 2, 3]})
tests = {"column_unique_values": ["col1"]}
try:
validate_df.run(df, tests)
except ValidationError:
assert False, "Validation failed but was expected to pass"


def test_validate_df_column_unique_values_fail():
df = pd.DataFrame({"col1": [1, 2, 2]})
tests = {"column_unique_values": ["col1"]}
with pytest.raises(ValidationError):
validate_df.run(df, tests)


def test_validate_df_column_list_to_match_pass():
df = pd.DataFrame({"col1": [1], "col2": [2]})
tests = {"column_list_to_match": ["col1", "col2"]}
try:
validate_df.run(df, tests)
except ValidationError:
assert False, "Validation failed but was expected to pass"


def test_validate_df_column_list_to_match_fail():
df = pd.DataFrame({"col1": [1]})
tests = {"column_list_to_match": ["col1", "col2"]}
with pytest.raises(ValidationError):
validate_df.run(df, tests)


def test_validate_df_dataset_row_count_pass():
df = pd.DataFrame({"col1": [1, 2, 3]})
tests = {"dataset_row_count": {"min": 1, "max": 5}}
try:
validate_df.run(df, tests)
except ValidationError:
assert False, "Validation failed but was expected to pass"


def test_validate_df_dataset_row_count_fail():
df = pd.DataFrame({"col1": [1, 2, 3, 4, 5, 6]})
tests = {"dataset_row_count": {"min": 1, "max": 5}}
with pytest.raises(ValidationError):
validate_df.run(df, tests)


def test_validate_df_column_match_regex_pass():
df = pd.DataFrame({"col1": ["A12", "B34", "C45"]})
tests = {"column_match_regex": {"col1": "^[A-Z][0-9]{2}$"}}
try:
validate_df.run(df, tests)
except ValidationError:
assert False, "Validation failed but was expected to pass"


def test_validate_df_column_match_regex_fail():
df = pd.DataFrame({"col1": ["A123", "B34", "C45"]})
tests = {"column_match_regex": {"col1": "^[A-Z][0-9]{2}$"}}
with pytest.raises(ValidationError):
validate_df.run(df, tests)


def test_validate_df_column_sum_pass():
df = pd.DataFrame({"col1": [1, 2, 3]})
tests = {"column_sum": {"col1": {"min": 5, "max": 10}}}
try:
validate_df.run(df, tests)
except ValidationError:
assert False, "Validation failed but was expected to pass"


def test_validate_df_column_sum_fail():
df = pd.DataFrame({"col1": [1, 2, 3, 4]})
tests = {"column_sum": {"col1": {"min": 5, "max": 6}}}
with pytest.raises(ValidationError):
validate_df.run(df, tests)
62 changes: 45 additions & 17 deletions viadot/task_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import os
import shutil
import re
import re
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, List, Literal, Union, cast
Expand Down Expand Up @@ -674,13 +674,14 @@ def anonymize_df(
@task(timeout=3600)
def validate_df(df: pd.DataFrame, tests: dict = None) -> None:
"""
Task to validate the data on DataFrame level.
Task to validate the data on DataFrame level. All numbers in the ranges are inclusive.
tests:
- `column_size`: dict{column: size}
- `column_unique_values`: list[columns]
- `column_list_to_match`: list[columns]
- `dataset_row_count`: dict: {'min': number, 'max', number}
- `column_match_regex`: dict: {column: 'regex'}
- `column_sum`: dict: {column: {'min': number, 'max': number}}
Args:
df (pd.DataFrame): The data frame for validation.
Expand All @@ -690,8 +691,9 @@ def validate_df(df: pd.DataFrame, tests: dict = None) -> None:
ValidationError: If validation failed for at least one test.
"""
failed_tests = 0
failed_tests_list = []

if tests is not None:
# column_row_count
if "column_size" in tests:
try:
for k, v in tests["column_size"].items():
Expand All @@ -706,14 +708,14 @@ def validate_df(df: pd.DataFrame, tests: dict = None) -> None:
f"[column_size] test for {k} failed. field lenght is different than {v}"
)
failed_tests += 1
failed_tests_list.append("column_size error")
except Exception as e:
logger.error(f"{e}")
except TypeError as e:
logger.error(
"Please provide `column_size` parameter as dictionary {'columns': value}."
)

# column_unique_values
if "column_unique_values" in tests:
for column in tests["column_unique_values"]:
df_size = df.shape[0]
Expand All @@ -723,44 +725,70 @@ def validate_df(df: pd.DataFrame, tests: dict = None) -> None:
)
else:
failed_tests += 1
failed_tests_list.append("column_unique_values error")
logger.error(
f"[column_unique_values] Values for {column} are not unique."
)

# list_column_to_match
if "column_list_to_match" in tests:
if set(tests["column_list_to_match"]) == set(df.columns):
logger.info(f"[column_list_to_match] passed.")
else:
failed_tests += 1
failed_tests_list.append("column_list_to_match error")
logger.error(
"[column_list_to_match] failed. Columns are different than expected."
)

# dataset_row_count
if "dataset_row_count" in tests:
row_count = len(df.iloc[:, 0])
max_value = tests["dataset_row_count"]["max"] or 10_000_000
max_value = tests["dataset_row_count"]["max"] or 100_000_000
min_value = tests["dataset_row_count"]["min"] or 0

if (row_count > min_value) and (row_count < max_value):
print("[dataset_row_count] passed.")
logger.info("[dataset_row_count] passed.")
else:
failed_tests += 1
logging.error(
f"[dataset_row_count] Row count is not between {min_value} and {max_value}"
failed_tests_list.append("dataset_row_count error")
logger.error(
f"[dataset_row_count] Row count ({row_count}) is not between {min_value} and {max_value}."
)
# to improve

if "column_match_regex" in tests:
for k, v in tests["column_match_regex"].items():
if df[k].apply(lambda x: re.match("(g\w+)\W(g\w+)", x)):
print(f"[column_match_regex] on {k} column passed.")
try:
matches = df[k].apply(lambda x: bool(re.match(v, str(x))))
if all(matches):
logger.info(f"[column_match_regex] on {k} column passed.")
else:
failed_tests += 1
failed_tests_list.append("column_match_regex error")
logger.error(f"[column_match_regex] on {k} column failed!")
except Exception as e:
failed_tests += 1
failed_tests_list.append("column_match_regex error")
logger.error(f"[column_match_regex] Error in {k} column: {e}")

if "column_sum" in tests:
for column, bounds in tests["column_sum"].items():
col_sum = df[column].sum()
min_bound = bounds["min"]
max_bound = bounds["max"]
if min_bound <= col_sum <= max_bound:
logger.info(
f"[column_sum] Sum of {col_sum} for {column} is within the expected range."
)
else:
failed_tests += 1
logging.error(f"[column_match_regex] on {k} column filed!")

failed_tests_list.append("column_sum error")
logger.error(
f"[column_sum] Sum of {col_sum} for {column} is out of the expected range - <{min_bound}:{max_bound}>"
)
else:
return "No tests to run."
return "No dataframe tests to run."

if failed_tests > 0:
raise ValidationError(f"Validation failed for {failed_tests} test/tests.")
failed_tests_msg = ", ".join(failed_tests_list)
raise ValidationError(
f"Validation failed for {failed_tests} test/tests: {failed_tests_msg}"
)

0 comments on commit 5379468

Please sign in to comment.