diff --git a/python/.pre-commit-config.yaml b/python/.pre-commit-config.yaml index c267119a8b..8294658557 100644 --- a/python/.pre-commit-config.yaml +++ b/python/.pre-commit-config.yaml @@ -35,7 +35,7 @@ repos: rev: v0.942 hooks: - id: mypy - language: system + language: python exclude: ^(python/tests/|python/examples/|python/examples/integration/|python/whylogs/core/proto/|python/docs/|python/whylogs/viz/html/|java|python/whylogs/api/logger/experimental/logger) - repo: https://github.com/pre-commit/mirrors-prettier rev: v2.5.1 diff --git a/python/Makefile b/python/Makefile index 86ef3e433e..586dd3a4d5 100644 --- a/python/Makefile +++ b/python/Makefile @@ -165,7 +165,7 @@ telemetry-opt-out: ## create opt out file install: ## Install all dependencies with poetry. @$(call i, Installing dependencies) - poetry install -E "viz s3 spark mlflow image fugue gcs embeddings proc" + poetry install -E "viz s3 spark mlflow image fugue gcs embeddings proc polars" coverage: ## Generate test coverage reports. @$(call i, Generating test coverage) diff --git a/python/poetry.lock b/python/poetry.lock index 0eedaeee12..709de63bc4 100644 --- a/python/poetry.lock +++ b/python/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "2to3" @@ -2890,6 +2890,47 @@ importlib-metadata = {version = ">=0.12", markers = "python_version < \"3.8\""} dev = ["pre-commit", "tox"] testing = ["pytest", "pytest-benchmark"] +[[package]] +name = "polars" +version = "1.8.2" +description = "Blazingly fast DataFrame library" +optional = true +python-versions = ">=3.8" +files = [ + {file = "polars-1.8.2-cp38-abi3-macosx_10_12_x86_64.whl", hash = "sha256:114be1ebfb051b794fb9e1f15999430c79cc0824595e237d3f45632be3e56d73"}, + {file = "polars-1.8.2-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:e4fc36cfe48972d4c5be21a7cb119d6378fb7af0bb3eeb61456b66a1f43228e3"}, + {file = "polars-1.8.2-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:67c1e448d6e38697650b22dd359f13c40b567c0b66686c8602e4367400e87801"}, + {file = "polars-1.8.2-cp38-abi3-manylinux_2_24_aarch64.whl", hash = "sha256:570ee86b033dc5a6dbe2cb0df48522301642f304dda3da48f53d7488899a2206"}, + {file = "polars-1.8.2-cp38-abi3-win_amd64.whl", hash = "sha256:ce1a1c1e2150ffcc44a5f1c461d738e1dcd95abbd0f210af0271c7ac0c9f7ef9"}, + {file = "polars-1.8.2.tar.gz", hash = "sha256:42f69277d5be2833b0b826af5e75dcf430222d65c9633872856e176a0bed27a0"}, +] + +[package.extras] +adbc = ["adbc-driver-manager[dbapi]", "adbc-driver-sqlite[dbapi]"] +all = ["polars[async,cloudpickle,database,deltalake,excel,fsspec,graph,iceberg,numpy,pandas,plot,pyarrow,pydantic,style,timezone]"] +async = ["gevent"] +calamine = ["fastexcel (>=0.9)"] +cloudpickle = ["cloudpickle"] +connectorx = ["connectorx (>=0.3.2)"] +database = ["nest-asyncio", "polars[adbc,connectorx,sqlalchemy]"] +deltalake = ["deltalake (>=0.15.0)"] +excel = ["polars[calamine,openpyxl,xlsx2csv,xlsxwriter]"] +fsspec = ["fsspec"] +gpu = ["cudf-polars-cu12"] +graph = ["matplotlib"] +iceberg = ["pyiceberg (>=0.5.0)"] +numpy = ["numpy (>=1.16.0)"] +openpyxl = ["openpyxl (>=3.0.0)"] +pandas = ["pandas", "polars[pyarrow]"] +plot = ["altair (>=5.4.0)"] +pyarrow = ["pyarrow (>=7.0.0)"] +pydantic = ["pydantic"] +sqlalchemy = ["polars[pandas]", "sqlalchemy"] +style = ["great-tables (>=0.8.0)"] +timezone = ["backports-zoneinfo", "tzdata"] +xlsx2csv = ["xlsx2csv (>=0.8.0)"] +xlsxwriter = ["xlsxwriter"] + [[package]] name = "pre-commit" version = "2.20.0" @@ -3918,19 +3959,38 @@ jeepney = ">=0.6" [[package]] name = "setuptools" -version = "68.0.0" +version = "57.5.0" description = "Easily download, build, install, upgrade, and uninstall Python packages" optional = false -python-versions = ">=3.7" +python-versions = ">=3.6" files = [ - {file = "setuptools-68.0.0-py3-none-any.whl", hash = "sha256:11e52c67415a381d10d6b462ced9cfb97066179f0e871399e006c4ab101fc85f"}, - {file = "setuptools-68.0.0.tar.gz", hash = "sha256:baf1fdb41c6da4cd2eae722e135500da913332ab3f2f5c7d33af9b492acb5235"}, + {file = "setuptools-57.5.0-py3-none-any.whl", hash = "sha256:60d78588f15b048f86e35cdab73003d8b21dd45108ee61a6693881a427f22073"}, + {file = "setuptools-57.5.0.tar.gz", hash = "sha256:d9d3266d50f59c6967b9312844470babbdb26304fe740833a5f8d89829ba3a24"}, ] [package.extras] -docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-hoverxref (<2)", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (==0.8.3)", "sphinx-reredirects", "sphinxcontrib-towncrier"] -testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pip (>=19.1)", "pip-run (>=8.8)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-ruff", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] -testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"] +docs = ["furo", "jaraco.packaging (>=8.2)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "rst.linker (>=1.9)", "sphinx", "sphinx-inline-tabs", "sphinxcontrib-towncrier"] +testing = ["flake8-2020", "jaraco.envs", "jaraco.path (>=3.2.0)", "mock", "paver", "pip (>=19.1)", "pytest (>=4.6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.0.1)", "pytest-flake8", "pytest-mypy", "pytest-virtualenv (>=1.2.7)", "pytest-xdist", "sphinx", "virtualenv (>=13.0.0)", "wheel"] + +[[package]] +name = "setuptools" +version = "75.3.0" +description = "Easily download, build, install, upgrade, and uninstall Python packages" +optional = false +python-versions = ">=3.8" +files = [ + {file = "setuptools-75.3.0-py3-none-any.whl", hash = "sha256:f2504966861356aa38616760c0f66568e535562374995367b4e69c7143cf6bcd"}, + {file = "setuptools-75.3.0.tar.gz", hash = "sha256:fba5dd4d766e97be1b1681d98712680ae8f2f26d7881245f2ce9e40714f1a686"}, +] + +[package.extras] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)", "ruff (>=0.5.2)"] +core = ["importlib-metadata (>=6)", "importlib-resources (>=5.10.2)", "jaraco.collections", "jaraco.functools", "jaraco.text (>=3.7)", "more-itertools", "more-itertools (>=8.8)", "packaging", "packaging (>=24)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1)", "wheel (>=0.43.0)"] +cover = ["pytest-cov"] +doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier", "towncrier (<24.7)"] +enabler = ["pytest-enabler (>=2.2)"] +test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "jaraco.test (>=5.5)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"] +type = ["importlib-metadata (>=7.0.2)", "jaraco.develop (>=7.21)", "mypy (==1.12.*)", "pytest-mypy"] [[package]] name = "six" @@ -4680,6 +4740,9 @@ files = [ {file = "whylogs_sketching-3.4.1.dev3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0ba536fca5f9578fa34d106c243fdccfef7d75b9d1fffb9d93df0debfe8e3ebc"}, {file = "whylogs_sketching-3.4.1.dev3-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:afa843c68cafa08e82624e6a33d13ab7f00ad0301101960872fe152d5af5ab53"}, {file = "whylogs_sketching-3.4.1.dev3-cp311-cp311-win_amd64.whl", hash = "sha256:303d55c37565340c2d21c268c64a712fad612504cc4b98b1d1df848cac6d934f"}, + {file = "whylogs_sketching-3.4.1.dev3-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4b636cebf5f4d7724437616368199c8e7b153f89dfd396f9e8279a95bf55d817"}, + {file = "whylogs_sketching-3.4.1.dev3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba4519780defebb35c4718ecc13d1b8c38894be722147a047e67b953cd2430ab"}, + {file = "whylogs_sketching-3.4.1.dev3-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:b4606e5360ce922e6ad770e845c75038d873300fd8a54ea856e99003b3254fc9"}, {file = "whylogs_sketching-3.4.1.dev3-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:9d65fcf8dade1affe50181582b8894929993e37d7daa922d973a811790cd0208"}, {file = "whylogs_sketching-3.4.1.dev3-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c4845e77c208ae64ada9170e1b92ed0abe28fe311c0fc35f9d8efa6926211ca2"}, {file = "whylogs_sketching-3.4.1.dev3-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:02cac1c87ac42d7fc7e6597862ac50bc035825988d21e8a2d763b416e83e845f"}, @@ -4808,7 +4871,7 @@ docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more-itertools", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)"] [extras] -all = ["Pillow", "Pillow", "boto3", "faster-fifo", "fugue", "google-cloud-storage", "ipython", "mlflow-skinny", "mlflow-skinny", "numpy", "numpy", "orjson", "pandas", "pyarrow", "pybars3", "pyspark", "scikit-learn", "scikit-learn", "scipy", "scipy"] +all = ["Pillow", "Pillow", "boto3", "faster-fifo", "fugue", "google-cloud-storage", "ipython", "mlflow-skinny", "mlflow-skinny", "numpy", "numpy", "orjson", "pandas", "polars", "pyarrow", "pybars3", "pyspark", "scikit-learn", "scikit-learn", "scipy", "scipy"] datasets = ["pandas"] docs = ["furo", "ipython_genutils", "myst-parser", "nbconvert", "nbsphinx", "sphinx", "sphinx-autoapi", "sphinx-autobuild", "sphinx-copybutton", "sphinx-inline-tabs", "sphinxext-opengraph"] embeddings = ["numpy", "numpy", "scikit-learn", "scikit-learn"] @@ -4816,6 +4879,7 @@ fugue = ["fugue"] gcs = ["google-cloud-storage"] image = ["Pillow", "Pillow", "numpy", "numpy"] mlflow = ["databricks-cli", "mlflow-skinny", "mlflow-skinny"] +polars = ["polars"] proc = ["faster-fifo", "orjson", "pandas"] proc-mp = ["orjson", "pandas"] s3 = ["boto3"] @@ -4825,4 +4889,4 @@ viz = ["Pillow", "Pillow", "ipython", "numpy", "numpy", "pybars3", "scipy", "sci [metadata] lock-version = "2.0" python-versions = ">=3.7.1, <4" -content-hash = "9e2ecd0f225c7e843c021525c5269f8230ee356787fc82ff5345f99c8431e6a4" +content-hash = "505c98c178ad13ef33d72f4b07f79e62c533226082925b7347f3c3682005e511" diff --git a/python/pyproject.toml b/python/pyproject.toml index 4376ff576e..3df1e5ea19 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -20,6 +20,10 @@ whylabs-client = "^0.6.5" requests = "^2.27" backoff = "^2.2.1" platformdirs = "^3.5.0" +setuptools = [ + { version = "<58", python = "<3.8", optional = false }, + { version = ">=75", python = ">=3.8", optional = false } +] # viz module. Everything after this should be optional pybars3 = { version = "^0.9", optional = true } @@ -34,7 +38,11 @@ numpy = [ ] # datasets module. -pandas = { version = "*", optional = true} +pandas = { version = "*", optional = true } + +# Polars module. +polars = { version = ">=1.8.2", python = ">=3.8", optional = true } +# TODO: do we want polars-u64-idx ? # Doc dependencies sphinx = { version = "*", optional = true } @@ -110,6 +118,10 @@ spark = [ datasets = [ "pandas", ] +polars = [ + "polars", +] + gcs = [ "google-cloud-storage", ] @@ -148,6 +160,7 @@ all = [ "boto3", "google-cloud-storage", "pandas", + "polars", "pyarrow", "pyspark", "ipython", diff --git a/python/tests/api/logger/test_logger_polars.py b/python/tests/api/logger/test_logger_polars.py new file mode 100644 index 0000000000..2b64ee315d --- /dev/null +++ b/python/tests/api/logger/test_logger_polars.py @@ -0,0 +1,257 @@ +import os +import sys +import tempfile +from typing import Any + +import numpy as np +import pytest + +import whylogs as why +from whylogs.api.logger import write +from whylogs.api.logger.result_set import ResultSet, ResultSetReader +from whylogs.core import ColumnProfileView, MetricConfig +from whylogs.core.metrics import StandardMetric +from whylogs.core.resolvers import Resolver +from whylogs.core.schema import DatasetSchema +from whylogs.core.stubs import is_stub, pd, pl + +if sys.version_info < (3, 8): + pytest.skip(allow_module_level=True, reason="Polars requires Python >= 3.8") + +if is_stub(pl.DataFrame): + pytest.skip(allow_module_level=True, reason="Requires Polars") + + +FLOAT_TYPES = [float, np.float32, np.float64, np.float_] +INTEGER_TYPES = [int, np.intc, np.uintc, np.int_, np.uint, np.longlong, np.ulonglong] + + +def test_basic_log_schema() -> None: + d = {"col1": [1, 2]} + df = pl.DataFrame(d) + logger = why.logger() + results = logger.log(df, schema=DatasetSchema()) + profile = results.profile() + assert profile._columns["col1"]._schema.dtype == pl.Int64 + + +def test_basic_log_schema_constructor() -> None: + d = {"col1": [1, 2]} + df = pl.DataFrame(data=d) + logger = why.logger(schema=DatasetSchema()) + results = logger.log(df) + profile = results.profile() + assert profile._columns["col1"]._schema.dtype == pl.Int64 + + +def test_basic_log() -> None: + d = {"col1": [1, 2], "col2": [3.0, 4.0], "col3": ["a", "b"]} + df = pl.DataFrame(data=d) + + results = why.log(df) + + profile = results.profile() + + assert profile._columns["col1"]._schema.dtype == pl.Int64 + assert profile._columns["col2"]._schema.dtype == pl.Float64 + assert profile._columns["col3"]._schema.dtype == pl.Utf8 + + +def test_lending_club(lending_club_df: pd.DataFrame) -> None: + df = pl.from_pandas(lending_club_df) + res = why.log(df) + view = res.view() + df = view.to_pandas() + assert len(df) == 151 + + +@pytest.mark.skip("type not supported yet") +def test_categorical_dtype() -> None: + data = {"can_fly": [0, 1, 0, 0], "habitat": ["forest", "forest", "river", "river"]} + + df = pl.DataFrame(data) + df["can_fly"] = df["can_fly"].astype("category") + df["habitat"] = df["habitat"].astype("category") + + results = why.log(df) + view = results.view() + metrics = view.get_column("can_fly").get_metric("counts").to_summary_dict() + assert metrics["n"] == 4 + + +def test_roundtrip_resultset(tmp_path: Any) -> None: + d = {"col1": [1, 2], "col2": [3.0, 4.0], "col3": ["a", "b"]} + df = pl.DataFrame(data=d) + + results = why.log(df) + status, path = results.writer("local", base_name="profile.bin").option(base_dir=tmp_path).write() + assert status + roundtrip_result_set = why.read(path) + assert len(results.view().to_pandas()) == len(roundtrip_result_set.view().to_pandas()) + + +def test_profile_write(tmp_path: Any) -> None: + d = {"col1": [1, 2], "col2": [3.0, 4.0], "col3": ["a", "b"]} + df = pl.DataFrame(data=d) + results = why.log(df) + profile = results.profile() + write(profile, tmp_path, "test1_profile.bin") + assert os.path.isfile(os.path.join(tmp_path, "test1_profile.bin")) + path = os.path.join(tmp_path, "test2_profile.bin") + write(profile, path) + assert os.path.isfile(path) + + +@pytest.mark.parametrize("data_type", [*INTEGER_TYPES, *FLOAT_TYPES]) +def test_different_integer_types(data_type) -> None: + d = {"col1": [data_type(x) for x in [1, 3, 2, 5]]} + df = pl.DataFrame(d) + results = why.log(df) + view = results.view() + + assert isinstance(view._columns["col1"], ColumnProfileView) + assert view._columns.get("col1")._failure_count == 0 + assert view._columns.get("col1")._success_count > 0 + + view_pandas = view.to_pandas() + assert len(view_pandas) == 1 + assert len(view_pandas.columns) > 0 + + +def test_counters_dataframe_vs_row() -> None: + d = {"a": 1, "b": 2.0, "c": ["foo", "bar"]} + df = pl.DataFrame(d) + + df_results = why.log(df) + row_results = why.log(d) + + df_view = df_results.view() + row_view = row_results.view() + + view_pandas = df_view.to_pandas() + assert len(view_pandas) == 3 + assert len(view_pandas.columns) > 0 + + view_row_pandas = row_view.to_pandas() + assert len(view_row_pandas) == 3 + assert len(view_row_pandas.columns) > 0 + + +@pytest.mark.parametrize( + "input", + [{"a": ["x", "y"]}, {"a": []}], # non-numeric list -> object # tensors require positive shape in every dimension +) +def test_object_count_dict(input) -> None: + row_results = why.log(input) + row_view = row_results.view() + assert row_view._columns.get("a")._success_count == 2 + assert row_view._columns.get("a")._metrics.get("types").object.value == 1 + + +def test_bool_count(): + data = { + "animal": ["cat", "hawk", "snake", "cat"], + "fly": [False, True, False, False], + "legs": [4, 2, 0, 4], + } + + df = pl.DataFrame(data) + + results = why.log(dataframe=df) + prof_view = results.profile().view() + assert prof_view._columns.get("fly")._metrics.get("types").boolean.value == 4 + assert prof_view._columns.get("fly")._metrics.get("types").integral.value == 0 + + +def test_unicode_range_enabled() -> None: + strings = { + "words": ["1", "12", "123", "1234a", "abc", "abc123", "I😍emoticons"], + } # TODO: follow and create ranges for common emoji like ❤️ /u+fe0f + data = pl.DataFrame(strings) + digit_counts = [1, 2, 3, 4, 0, 3, 0] + latin_counts = [1, 2, 3, 5, 3, 6, 10] + emoticon_counts = [0, 0, 0, 0, 0, 0, 1] + configured_schema = DatasetSchema(default_configs=MetricConfig(track_unicode_ranges=True)) + prof_view = why.log(data, schema=configured_schema).view() + assert "words" in prof_view.get_columns() + column_profile = prof_view.get_column("words") + assert "unicode_range" in column_profile.get_metric_names() + metric = column_profile.get_metric("unicode_range") + + assert "digits" in metric.submetrics + assert "basic-latin" in metric.submetrics + assert "emoticon" in metric.submetrics + + assert metric.submetrics["digits"]["distribution"].mean.value == np.array(digit_counts).mean() + assert metric.submetrics["emoticon"]["distribution"].mean.value == np.array(emoticon_counts).mean() + assert metric.submetrics["basic-latin"]["distribution"].mean.value == np.array(latin_counts).mean() + + +def test_unicode_range_default_config_off() -> None: + strings = { + "words": ["1", "12", "123", "1234a", "abc", "abc123", "I😍emoticon"], + } + data = pl.DataFrame(strings) + + prof_view = why.log(data).view() + assert "words" in prof_view.get_columns() + column_profile = prof_view.get_column("words") + assert "unicode_range" not in column_profile.get_metric_names() + + +def test_frequent_items() -> None: + strings = { + "words": ["1", "12", "123"], + } + data = pl.DataFrame(strings) + + prof_view = why.log(data).view() + assert "words" in prof_view.get_columns() + column_profile = prof_view.get_column("words") + assert "frequent_items" in column_profile.get_metric_names() + + +def test_frequent_items_disabled() -> None: + strings = { + "words": ["1", "12", "123"], + } + data = pl.DataFrame(strings) + configured_schema = DatasetSchema(default_configs=MetricConfig(fi_disabled=True)) + + prof_view = why.log(data, schema=configured_schema).view() + assert "words" in prof_view.get_columns() + column_profile = prof_view.get_column("words") + assert "frequent_items" not in column_profile.get_metric_names() + + +def test_custom_resolver() -> None: + class CustomResolver(Resolver): + """Resolver that keeps distribution metrics for Fractional and frequent items for Integral, and counters and types metrics for all data types.""" + + def resolve(self, name: str, why_type, column_schema): + metrics = [] + if name == "col1": + metrics.append(StandardMetric.counts) + result = {} + for m in metrics: + result[m.name] = m.zero(column_schema.cfg) + return result + + d = {"col1": [3.0, 4.0, 5.0]} + df = pl.DataFrame(data=d) + prof_view = why.log(df, schema=DatasetSchema(resolvers=CustomResolver())).profile().view() + + assert prof_view.get_column("col1").get_metric("counts").n.value == 3 + assert not prof_view.get_column("col1").get_metric("distribution") + + +def test_result_set_reader(profile_view): + with tempfile.NamedTemporaryFile() as tmp_file: + success, path = profile_view.write(file=tmp_file) + assert success + tmp_file.flush() + tmp_file.seek(0) + reader = why.reader(name="local") + results = reader.read(path=path) + assert isinstance(reader, ResultSetReader) + assert isinstance(results, ResultSet) diff --git a/python/tests/api/logger/test_segments_polars.py b/python/tests/api/logger/test_segments_polars.py new file mode 100644 index 0000000000..52ad2279a0 --- /dev/null +++ b/python/tests/api/logger/test_segments_polars.py @@ -0,0 +1,524 @@ +import math +import os +import pickle +import sys +import tempfile +from glob import glob +from logging import getLogger +from typing import Any + +import numpy as np +import pytest + +import whylogs as why +from whylogs.api.logger.result_set import ( + ProfileResultSet, + SegmentedResultSet, + ViewResultSet, +) +from whylogs.core.metrics.metrics import CardinalityMetric, DistributionMetric +from whylogs.core.schema import DatasetSchema +from whylogs.core.segment import Segment +from whylogs.core.segmentation_partition import ( + ColumnMapperFunction, + SegmentationPartition, + SegmentFilter, + segment_on_column, +) +from whylogs.core.stubs import is_stub, pl +from whylogs.core.view.dataset_profile_view import DatasetProfileView +from whylogs.migration.converters import read_v0_to_view + +if sys.version_info < (3, 8): + pytest.skip(allow_module_level=True, reason="Polars requires Python >= 3.8") + +if is_stub(pl.DataFrame): + pytest.skip(allow_module_level=True, reason="Requires Polars") + + +TEST_LOGGER = getLogger(__name__) + + +def test_single_column_segment() -> None: + input_rows = 100 + segment_column = "col3" + number_of_segments = 5 + d = { + "col1": [i for i in range(input_rows)], + "col2": [i * i * 1.1 for i in range(input_rows)], + segment_column: [f"x{str(i%number_of_segments)}" for i in range(input_rows)], + } + + df = pl.DataFrame(d) + test_segments = segment_on_column("col3") + results: SegmentedResultSet = why.log(df, schema=DatasetSchema(segments=test_segments)) + assert results.count == number_of_segments + partitions = results.partitions + assert len(partitions) == 1 + partition = partitions[0] + segments = results.segments_in_partition(partition) + assert len(segments) == number_of_segments + + first_segment = next(iter(segments)) + first_segment_profile = results.profile(first_segment) + assert first_segment_profile is not None + assert first_segment_profile._columns["col1"]._schema.dtype == pl.Int64 # np.int64 + assert first_segment_profile._columns["col2"]._schema.dtype == pl.Float64 # np.float64 + assert first_segment_profile._columns["col3"]._schema.dtype == pl.String # .name == "object" + segment_cardinality: CardinalityMetric = ( + first_segment_profile.view().get_column(segment_column).get_metric("cardinality") + ) + cardinality = segment_cardinality.estimate + assert cardinality is not None + assert cardinality == 1.0 + + +def test_single_column_and_manual_segment() -> None: + input_rows = 100 + segment_column = "col3" + number_of_segments = 5 + d = { + "col1": [i for i in range(input_rows)], + "col2": [i * i * 1.1 for i in range(input_rows)], + segment_column: [f"x{str(i%number_of_segments)}" for i in range(input_rows)], + } + + df = pl.DataFrame(d) + test_segments = segment_on_column("col3") + results: SegmentedResultSet = why.log( + df, schema=DatasetSchema(segments=test_segments), segment_key_values={"zzz": "foo", "ver": 1} + ) + assert results.count == number_of_segments + partitions = results.partitions + assert len(partitions) == 1 + partition = partitions[0] + segments = results.segments_in_partition(partition) + assert len(segments) == number_of_segments + + first_segment = next(iter(segments)) + # assert first_segment.key == ("x0", "1", "foo") + first_segment_profile = results.profile(first_segment) + assert first_segment_profile is not None + assert first_segment_profile._columns["col1"]._schema.dtype == pl.Int64 # np.int64 + assert first_segment_profile._columns["col2"]._schema.dtype == pl.Float64 # np.float64 + assert first_segment_profile._columns["col3"]._schema.dtype == pl.String # .name == "object" + segment_cardinality: CardinalityMetric = ( + first_segment_profile.view().get_column(segment_column).get_metric("cardinality") + ) + cardinality = segment_cardinality.estimate + assert cardinality is not None + assert cardinality == 1.0 + + +def test_throw_on_duplicate_keys() -> None: + input_rows = 100 + segment_column = "col3" + number_of_segments = 5 + d = { + "col1": [i for i in range(input_rows)], + "col2": [i * i * 1.1 for i in range(input_rows)], + segment_column: [f"x{str(i%number_of_segments)}" for i in range(input_rows)], + } + + df = pl.DataFrame(d) + test_segments = segment_on_column("col3") + + with pytest.raises(ValueError): + why.log(df, schema=DatasetSchema(segments=test_segments), segment_key_values={segment_column: "foo"}) + + +def test_single_column_segment_with_trace_id() -> None: + input_rows = 100 + segment_column = "col3" + number_of_segments = 5 + trace_id = "123-456" + d = { + "col1": [i for i in range(input_rows)], + "col2": [i * i * 1.1 for i in range(input_rows)], + segment_column: [f"x{str(i%number_of_segments)}" for i in range(input_rows)], + } + + df = pl.DataFrame(d) + test_segments = segment_on_column("col3") + results: SegmentedResultSet = why.log(df, schema=DatasetSchema(segments=test_segments), trace_id=trace_id) + assert results.count == number_of_segments + partitions = results.partitions + assert len(partitions) == 1 + partition = partitions[0] + segments = results.segments_in_partition(partition) + assert len(segments) == number_of_segments + + first_segment = next(iter(segments)) + first_segment_profile = results.profile(first_segment) + assert first_segment_profile is not None + assert first_segment_profile._columns["col1"]._schema.dtype == pl.Int64 # np.int64 + assert first_segment_profile._columns["col2"]._schema.dtype == pl.Float64 # np.float64 + assert first_segment_profile._columns["col3"]._schema.dtype == pl.String # .name == "object" + segment_cardinality: CardinalityMetric = ( + first_segment_profile.view().get_column(segment_column).get_metric("cardinality") + ) + cardinality = segment_cardinality.estimate + assert cardinality is not None + # cardinality is an estimate, and because this is the segment column, it should + # by definition contain only one unique value per segment. + assert cardinality == 1.0 + assert results.metadata is not None + assert results.metadata["whylabs.traceId"] == trace_id + + +def test_single_integer_column_segment() -> None: + input_rows = 100 + segment_column = "col3" + number_of_segments = 5 + d = { + "col1": [i for i in range(input_rows)], + "col2": [i * i * 1.1 for i in range(input_rows)], + segment_column: [(i % number_of_segments) for i in range(input_rows)], + } + + df = pl.DataFrame(d) + test_segments = segment_on_column("col3") + results: SegmentedResultSet = why.log(df, schema=DatasetSchema(segments=test_segments)) + assert results.count == number_of_segments + partitions = results.partitions + assert len(partitions) == 1 + partition = partitions[0] + segments = results.segments_in_partition(partition) + assert len(segments) == number_of_segments + + first_segment = next(iter(segments)) + first_segment_profile = results.profile(first_segment) + assert first_segment_profile is not None + assert first_segment_profile._columns["col1"]._schema.dtype == pl.Int64 # np.int64 + assert first_segment_profile._columns["col2"]._schema.dtype == pl.Float64 # np.float64 + assert first_segment_profile._columns["col3"]._schema.dtype == pl.Int64 # np.int64 + segment_cardinality: CardinalityMetric = ( + first_segment_profile.view().get_column(segment_column).get_metric("cardinality") + ) + cardinality = segment_cardinality.estimate + assert cardinality is not None + assert cardinality == 1.0 + + +@pytest.mark.skip("Haven't figured out how to curry Polars expressions yet") +def test_filtered_single_column_segment() -> None: + input_rows = 100 + segment_column = "col3" + number_of_segments = 5 + d = { + "col1": [i for i in range(input_rows)], + "col2": [i * i * 1.1 for i in range(input_rows)], + segment_column: [f"x{str(i%number_of_segments)}" for i in range(input_rows)], + } + + df = pl.DataFrame(d) + test_segments = segment_on_column(segment_column) + test_segments[segment_column].filter = SegmentFilter(filter_function=(pl.col("col1") > 49)) + + results: SegmentedResultSet = why.log(df, schema=DatasetSchema(segments=test_segments)) + assert results.count == number_of_segments + partitions = results.partitions + assert len(partitions) == 1 + partition = partitions[0] + segments = results.segments_in_partition(partition) + assert len(segments) == number_of_segments + + first_segment: Segment = next(next(iter(segments))) # polars comes out in different order + first_segment_profile = results.profile(first_segment) + assert first_segment.key == ("x0",) + assert first_segment_profile is not None + assert first_segment_profile._columns["col1"]._schema.dtype == pl.Int64 # np.int64 + assert first_segment_profile._columns["col2"]._schema.dtype == pl.Float64 # np.float64 + assert first_segment_profile._columns[segment_column]._schema.dtype == pl.String # .name == "object" + segment_distribution: DistributionMetric = ( + first_segment_profile.view().get_column("col1").get_metric("distribution") + ) + assert segment_distribution is not None + count = segment_distribution.n + assert count is not None + assert count == 10 + + +@pytest.mark.parametrize("v0", [True, False]) +def test_segment_write_roundtrip_versions(tmp_path: Any, v0) -> None: + input_rows = 10 + segment_column = "col3" + number_of_segments = 2 + trace_id = "123-456" + values_per_segment = input_rows / number_of_segments + d = { + "col1": [i for i in range(input_rows)], + "col2": [i * i * 1.1 for i in range(input_rows)], + segment_column: [f"x{str(i%number_of_segments)}" for i in range(input_rows)], + } + + df = pl.DataFrame(d) + test_segments = segment_on_column(segment_column) + + results: SegmentedResultSet = why.log(df, trace_id=trace_id, schema=DatasetSchema(segments=test_segments)) + assert results.count == number_of_segments + partitions = results.partitions + assert len(partitions) == 1 + partition = partitions[0] + segments = results.segments_in_partition(partition) + assert len(segments) == number_of_segments + + seg_i = iter(segments) # polars segments order is non-deterministic + first_segment: Segment = next(seg_i) + if first_segment.key == ("x1",): + first_segment = next(seg_i) + first_segment_profile = results.profile(first_segment) + assert first_segment.key == ("x0",) + assert first_segment_profile is not None + assert first_segment_profile._columns["col1"]._schema.dtype == pl.Int64 # np.int64 + assert first_segment_profile._columns["col2"]._schema.dtype == pl.Float64 # np.float64 + assert first_segment_profile._columns[segment_column]._schema.dtype == pl.String # .name == "object" + segment_distribution: DistributionMetric = ( + first_segment_profile.view().get_column("col1").get_metric("distribution") + ) + count = segment_distribution.n + assert count is not None + assert count == values_per_segment + + results.writer().option(base_dir=tmp_path).write(use_v0=v0) + paths = glob(os.path.join(tmp_path) + "/*x0.bin") + assert len(paths) == 1 + roundtrip_profiles = [] + for file_path in paths: + if v0: + roundtrip_profiles.append(read_v0_to_view(path=file_path)) + else: + roundtrip_profiles.append(why.read(path=file_path).view()) + assert len(roundtrip_profiles) == 1 + post_deserialization_first_view = roundtrip_profiles[0] + assert post_deserialization_first_view is not None + assert isinstance(post_deserialization_first_view, DatasetProfileView) + + # check that trace_id is preserved round trip in metadata + assert post_deserialization_first_view.metadata + assert "whylabs.traceId" in post_deserialization_first_view.metadata + assert trace_id == post_deserialization_first_view.metadata["whylabs.traceId"] + pre_serialization_first_view = first_segment_profile.view() + pre_columns = pre_serialization_first_view.get_columns() + post_columns = post_deserialization_first_view.get_columns() + + # check that the distribution looks similar for each column profile + for column_name in pre_columns: + initial_column_profile = pre_columns[column_name] + target_column_profile = post_columns[column_name] + assert initial_column_profile is not None + assert target_column_profile is not None + assert target_column_profile.get_metric("distribution").n == initial_column_profile.get_metric("distribution").n + assert ( + target_column_profile.get_metric("distribution").avg + == initial_column_profile.get_metric("distribution").avg + ) + + +def test_multi_column_segment() -> None: + input_rows = 100 + d = { + "col1": [i for i in range(input_rows)], + "col2": [i * i * 1.1 for i in range(input_rows)], + "col3": [f"x{str(i%5)}" for i in range(input_rows)], + } + + df = pl.DataFrame(d) + segmentation_partition = SegmentationPartition( + name="col1,col3", mapper=ColumnMapperFunction(col_names=["col1", "col3"]) + ) + test_segments = {segmentation_partition.name: segmentation_partition} + results: SegmentedResultSet = why.log(df, schema=DatasetSchema(segments=test_segments)) + segments = results.segments() + last_segment = segments[-1] + + # Note this segment is not useful as there is only one datapoint per segment, we have 100 rows and + # 100 segments. The segment value is a tuple of strings identifying this segment. + # assert last_segment.key == ("99", "x4") + + last_segment_profile = results.profile(last_segment) + + assert last_segment_profile._columns["col1"]._schema.dtype == pl.Int64 # np.int64 + assert last_segment_profile._columns["col2"]._schema.dtype == pl.Float64 # np.float64 + assert last_segment_profile._columns["col3"]._schema.dtype == pl.String # .name == "object" + + segment_distribution: DistributionMetric = last_segment_profile.view().get_column("col1").get_metric("distribution") + count = segment_distribution.n + assert count is not None + assert count == 1 + + +def test_multicolumn_and_manual_segment() -> None: + input_rows = 100 + d = { + "col1": [i for i in range(input_rows)], + "col2": [i * i * 1.1 for i in range(input_rows)], + "col3": [f"x{str(i%5)}" for i in range(input_rows)], + } + + df = pl.DataFrame(d) + segmentation_partition = SegmentationPartition( + name="col1,col3", mapper=ColumnMapperFunction(col_names=["col1", "col3"]) + ) + test_segments = {segmentation_partition.name: segmentation_partition} + results: SegmentedResultSet = why.log( + df, schema=DatasetSchema(segments=test_segments), segment_key_values={"ver": 42, "zzz": "bar"} + ) + segments = results.segments() + last_segment = segments[-1] + + # Note this segment is not useful as there is only one datapoint per segment, we have 100 rows and + # 100 segments. The segment value is a tuple of strings identifying this segment. + # assert last_segment.key == ("99", "x4", "42", "bar") + + last_segment_profile = results.profile(last_segment) + + assert last_segment_profile._columns["col1"]._schema.dtype == pl.Int64 # np.int64 + assert last_segment_profile._columns["col2"]._schema.dtype == pl.Float64 # np.float64 + assert last_segment_profile._columns["col3"]._schema.dtype == pl.String # .name == "object" + + segment_distribution: DistributionMetric = last_segment_profile.view().get_column("col1").get_metric("distribution") + count = segment_distribution.n + assert count is not None + assert count == 1 + + +def test_multi_column_segment_serialization_roundtrip_v0(tmp_path: Any) -> None: + input_rows = 35 + d = { + "A": [i % 7 for i in range(input_rows)], + "B": [f"x{str(i%5)}" for i in range(input_rows)], + } + + df = pl.DataFrame(d) + segmentation_partition = SegmentationPartition(name="A,B", mapper=ColumnMapperFunction(col_names=["A", "B"])) + test_segments = {segmentation_partition.name: segmentation_partition} + results: SegmentedResultSet = why.log(df, schema=DatasetSchema(segments=test_segments)) + results.writer().option(base_dir=tmp_path).write(use_v0=True) + + paths = glob(os.path.join(tmp_path) + "/*.bin") + assert len(paths) == input_rows + roundtrip_profiles = [] + for file_path in paths: + roundtrip_profiles.append(read_v0_to_view(file_path)) + assert len(roundtrip_profiles) == input_rows + TEST_LOGGER.info(roundtrip_profiles) + TEST_LOGGER.info(roundtrip_profiles[15]) + + post_deserialization_view = roundtrip_profiles[15] + assert post_deserialization_view is not None + assert isinstance(post_deserialization_view, DatasetProfileView) + + post_columns = post_deserialization_view.get_columns() + assert "A" in post_columns.keys() + assert "B" in post_columns.keys() + + +def test_merge_view() -> None: + df = pl.DataFrame({"col1": [1, 2]}) + logger = why.logger() + results = logger.log(df) + merged_results = results.merge(ViewResultSet.zero()) + view = merged_results.view() + assert view._columns["col1"]._metrics["types"].integral.value == 2 + + +def test_merge_two_result_sets() -> None: + df1 = pl.DataFrame({"col1": [1, 2]}) + df2 = pl.DataFrame({"col1": [3, 4]}) + logger = why.logger() + results1 = logger.log(df1) + results2 = logger.log(df2) + merged_results = results1.merge(results2) + view = merged_results.view() + assert view._columns["col1"]._metrics["types"].integral.value == 4 + assert view._columns["col1"]._metrics["distribution"].min == 1 + assert view._columns["col1"]._metrics["distribution"].max == 4 + + +def test_merge_result_set_zero() -> None: + df = pl.DataFrame({"col1": [1, 2]}) + logger = why.logger() + results = logger.log(df) + merged_results = results.merge(ProfileResultSet.zero()) + view = merged_results.view() + assert view._columns["col1"]._metrics["types"].integral.value == 2 + + +def test_pickle_load_merge_profile_view() -> None: + df = pl.DataFrame({"col1": [1, 2]}) + logger = why.logger() + results = logger.log(df) + view2 = logger.log({"col1": 3}).view() + pickle_loaded_view = None + with tempfile.NamedTemporaryFile() as tmp_file: + pickle.dump(results.view(), tmp_file) + tmp_file.flush() + tmp_file.seek(0) + pickle_loaded_view = pickle.load(tmp_file) + + assert pickle_loaded_view is not None + assert isinstance(pickle_loaded_view, DatasetProfileView) + + merged_view = view2.merge(pickle_loaded_view) + assert merged_view._columns["col1"]._metrics["types"].integral.value == 3 + + +def test_segment_merge_different_columns() -> None: + input_rows = 35 + d = { + "A": [i % 7 for i in range(input_rows)], + "B": [f"x{str(i%5)}" for i in range(input_rows)], + } + input_rows2 = 27 + d2 = { + "A": [i % 4 for i in range(input_rows2)], + "B": [f"x{str(i%7)}" for i in range(input_rows2)], + "C": [bool(i % 2) for i in range(input_rows2)], + } + + df = pl.DataFrame(d) + df2 = pl.DataFrame(d2) + segmentation_partition = SegmentationPartition(name="A,B", mapper=ColumnMapperFunction(col_names=["A", "B"])) + test_segments = {segmentation_partition.name: segmentation_partition} + segmented_schema = DatasetSchema(segments=test_segments) + results: SegmentedResultSet = why.log(df, schema=segmented_schema) + results2: SegmentedResultSet = why.log(df2, schema=segmented_schema) + merged_results = results.merge(results2) + + assert merged_results.count == 42 + for segment in merged_results.segments(): + segmented_view = merged_results.view(segment=segment) + if len(segmented_view._columns) == 3: + assert segmented_view._columns["C"] is not None + assert segmented_view._columns["C"]._metrics["types"].boolean.value > 0 + else: + # some segments haven't seen column 'C' and so only have two columns + assert len(segmented_view._columns) == 2 + assert segmented_view._columns["A"] is not None + assert segmented_view._columns["B"] is not None + assert segmented_view._columns["A"]._metrics["cardinality"].estimate == pytest.approx(1.0) + + +def test_segment_with_nans() -> None: + df = pl.DataFrame({"col_1": [1, 2, 3, 4, 5, 6], "col_nan": [0.0, 0.0, None, None, np.nan, math.nan]}) + column_segments = segment_on_column("col_nan") + schema = DatasetSchema(segments=column_segments) + profile_results = why.log(df, schema=schema) + assert profile_results.count == 3 # col_nan = True + segment = profile_results.segments()[0] + segmented_view = profile_results.profile(segment).view() + assert segmented_view.get_column("col_nan").get_metric("counts").to_summary_dict()["n"] == 2 + + segmentation_partition = SegmentationPartition( + name="col_1,col_nan", mapper=ColumnMapperFunction(col_names=["col_1", "col_nan"]) + ) + multi_column_segments = {segmentation_partition.name: segmentation_partition} + schema = DatasetSchema(segments=multi_column_segments) + + profile_results = why.log(df, schema=schema) + assert profile_results.count == 6 # (1,True), (2,True), (3,nan), (4,nan), (5,nan), (6,nan) + for segment in profile_results.segments(): + segmented_view = profile_results.profile(segment).view() + # each segment has n=1 + assert segmented_view.get_column("col_nan").get_metric("counts").to_summary_dict()["n"] == 1 diff --git a/python/tests/core/metrics/test_metrics_polars.py b/python/tests/core/metrics/test_metrics_polars.py new file mode 100644 index 0000000000..feb10bbcfd --- /dev/null +++ b/python/tests/core/metrics/test_metrics_polars.py @@ -0,0 +1,238 @@ +import sys +from logging import getLogger + +import numpy as np +import pytest + +import whylogs as why +from whylogs.core import ColumnProfileView, DatasetSchema +from whylogs.core.datatypes import AnyType +from whylogs.core.metrics import StandardMetric +from whylogs.core.metrics.metrics import ( + CardinalityMetric, + DistributionMetric, + MetricConfig, +) +from whylogs.core.preprocessing import PreprocessedColumn +from whylogs.core.resolvers import MetricSpec, ResolverSpec +from whylogs.core.schema import DeclarativeSchema +from whylogs.core.stubs import is_stub, pd, pl + +if sys.version_info < (3, 8): + pytest.skip(allow_module_level=True, reason="Polars requires Python >= 3.8") + +if is_stub(pl.DataFrame): + pytest.skip(allow_module_level=True, reason="Requires Polars") + + +TEST_LOGGER = getLogger(__name__) + + +def test_distribution_metrics_series() -> None: + dist = DistributionMetric.zero(MetricConfig()) + data = pl.Series(list(range(100))) + col = PreprocessedColumn.apply(data) + dist.columnar_update(col) + + assert dist.kll.value.get_n() == 100 + assert dist.mean.value == data.mean() + assert dist.variance == data.var() + + +def test_distribution_variance_m2() -> None: + import statistics + + dist_list = DistributionMetric.zero(MetricConfig()) + dist_pandas = DistributionMetric.zero(MetricConfig()) + dist_polars = DistributionMetric.zero(MetricConfig()) + dist_numpy = DistributionMetric.zero(MetricConfig()) + test_input = [1, 2, 3, 4] + + list_test_input = PreprocessedColumn() + list_test_input.list.ints = test_input + n = len(test_input) + mean = sum(test_input) / n + variance = statistics.variance(test_input) # sample variance, uses n-1 normalization + m2 = (n - 1) * variance + TEST_LOGGER.info(f"statistic package using input {test_input} has variance={variance}, m2={m2}, n={n}") + pandas_test_input = PreprocessedColumn.apply(pd.Series(test_input)) + polars_test_input = PreprocessedColumn.apply(pl.Series(test_input)) + numpy_test_input = PreprocessedColumn.apply(np.array(test_input)) + dist_list.columnar_update(list_test_input) + dist_pandas.columnar_update(pandas_test_input) + dist_polars.columnar_update(polars_test_input) + dist_numpy.columnar_update(numpy_test_input) + + TEST_LOGGER.info(f"dist_list={dist_list.to_summary_dict()}") + TEST_LOGGER.info(f"dist_pandas={dist_pandas.to_summary_dict()}") + TEST_LOGGER.info(f"dist_polars={dist_polars.to_summary_dict()}") + TEST_LOGGER.info(f"dist_numpy={dist_numpy.to_summary_dict()}") + assert dist_list.m2.value == m2 + assert dist_pandas.m2.value == m2 + assert dist_polars.m2.value == m2 + assert dist_numpy.m2.value == m2 + assert dist_list.variance == variance + assert dist_pandas.variance == variance + assert dist_polars.variance == variance + assert dist_numpy.variance == variance + assert dist_list.avg == mean + assert dist_pandas.avg == mean + assert dist_polars.avg == mean + assert dist_numpy.avg == mean + + +def test_distribution_metrics_indexed_series_single_row() -> None: + dist = DistributionMetric.zero(MetricConfig()) + data = pl.Series(list(range(1))) + col = PreprocessedColumn.apply(data) + dist.columnar_update(col) + + assert dist.kll.value.get_n() == 1 + assert dist.mean.value == data.mean() + + +def test_track_single_values_profile_mean() -> None: + data = list(range(30)) + df = pl.DataFrame({"col1": data}) + actual_mean = df["col1"].mean() + actual_stddev = df["col1"].std() + prof_view_df = why.log(df).profile().view() + profile_mean1 = prof_view_df.get_column("col1").get_metric("distribution").mean.value + profile_stddev1 = prof_view_df.get_column("col1").get_metric("distribution").stddev + for i, d in enumerate(data): + if i == 0: + prof_track = why.log(row={"col1": d}).profile() + else: + prof_track.track({"col1": d}) + profile_mean2 = prof_track.view().get_column("col1").get_metric("distribution").mean.value + profile_stddev2 = prof_track.view().get_column("col1").get_metric("distribution").stddev + + assert round(actual_mean, 3) == round(profile_mean1, 3) + assert round(actual_mean, 3) == round(profile_mean2, 3) + assert round(actual_stddev, 3) == round(profile_stddev1, 3) + assert round(actual_stddev, 3) == round(profile_stddev2, 3) + + +def test_merge_single_values_profile_mean() -> None: + data = list(range(30)) + df = pl.DataFrame({"col1": data}) + actual_mean = df["col1"].mean() + actual_stddev = df["col1"].std() + prof_view_df = why.log(df).profile().view() + profile_mean1 = prof_view_df.get_column("col1").get_metric("distribution").mean.value + profile_stddev1 = prof_view_df.get_column("col1").get_metric("distribution").stddev + + profiles = [why.log(row={"col1": d}).profile().view() for d in data] + from functools import reduce + + merged_profile_view = reduce((lambda x, y: x.merge(y)), profiles) + profile_mean2 = merged_profile_view.get_column("col1").get_metric("distribution").mean.value + profile_stddev2 = merged_profile_view.get_column("col1").get_metric("distribution").stddev + + assert round(actual_mean, 3) == round(profile_mean1, 3) + assert round(actual_mean, 3) == round(profile_mean2, 3) + assert round(actual_stddev, 3) == round(profile_stddev1, 3) + assert round(actual_stddev, 3) == round(profile_stddev2, 3) + + +def test_merge_two_profiles_mean(lending_club_df: pd.DataFrame) -> None: + first_df = lending_club_df.head(500) + + second_df = lending_club_df.tail(500) + + actual_mean = lending_club_df["loan_amnt"].mean() + actual_mean_1 = first_df["loan_amnt"].mean() + actual_mean_2 = second_df["loan_amnt"].mean() + + first_profile: ColumnProfileView = why.log(pl.from_pandas(first_df)).view().get_column("loan_amnt") + first_profile_mean = first_profile.get_metric("distribution").mean.value + second_profile = why.log(pl.from_pandas(second_df)).view().get_column("loan_amnt") + second_profile_mean = second_profile.get_metric("distribution").mean.value + + merged_profile = first_profile.merge(second_profile) + merged_profile_mean = merged_profile.get_metric("distribution").mean.value + + assert round(merged_profile_mean, 3) == round(actual_mean, 3) + assert round(first_profile_mean, 3) == round(actual_mean_1, 3) + assert round(second_profile_mean, 3) == round(actual_mean_2, 3) + + +def test_frequent_items_handling_int_as_string() -> None: + df = pl.DataFrame({"int": [1, 1, 1]}) + + res = why.log(df).view().to_pandas()["frequent_items/frequent_strings"] + assert res.array[0][0].value == "1" # type: ignore + + +def test_frequent_items_handling_bool_as_string() -> None: + import whylogs.core.metrics.metrics as met + + met._BOOL_LIST_CHUNK_SIZE = 2 + df = pl.DataFrame({"bool": [True, True, True, True, False]}) + + schema = DeclarativeSchema( + [ResolverSpec(column_type=AnyType, metrics=[MetricSpec(StandardMetric.frequent_items.value)])] + ) + res = why.log(df, schema=schema).view().to_pandas()["frequent_items/frequent_strings"] + assert res.array[0][0].value == "True" # type: ignore + assert res.array[0][1].value == "False" # type: ignore + + +def test_frequent_items_bounds_order() -> None: + df_gamma = pl.DataFrame({"feature1": np.random.gamma(1, 2, 1000).astype(int)}) + df_rand = pl.DataFrame({"feature1": np.random.randint(10000, size=9000)}) + df = pl.concat([df_gamma, df_rand]) + + res = why.log(df).view().to_pandas()["frequent_items/frequent_strings"] + fi_tuple = res.array[0][0] + assert fi_tuple.lower <= fi_tuple.est <= fi_tuple.upper + + +@pytest.mark.skip("frequent item length is only enforced on pandas view strings") +@pytest.mark.parametrize( + "config, limit", + [ + (MetricConfig(), MetricConfig().max_frequent_item_size), + (MetricConfig(max_frequent_item_size=50), 50), + ], +) +def test_frequent_item_max_size(config: MetricConfig, limit: int) -> None: + df = pl.DataFrame({"str": ["X" * 200]}) + schema = DatasetSchema(default_configs=config) + res = why.log(df, schema=schema).view().to_pandas()["frequent_items/frequent_strings"] + assert len(res.array[0][0].value) <= limit + + +def test_cardinality_metric_booleans() -> None: + cardinality = CardinalityMetric.zero(MetricConfig()) + data = pl.Series([True, False, True, True]) + col = PreprocessedColumn.apply(data) + cardinality.columnar_update(col) + + assert cardinality.estimate == pytest.approx(2, 0.1) + + +def test_cardinality_metric_booleans_top_level_api() -> None: + input_rows = 5 + col_name = "p" + d = {col_name: [bool(i % 2) for i in range(input_rows)]} + df = pl.DataFrame(d) + + schema = DeclarativeSchema( + [ResolverSpec(column_type=AnyType, metrics=[MetricSpec(StandardMetric.cardinality.value)])] + ) + results = why.log(df, schema=schema) + col_prof = results.view().get_column(col_name) + cardinality: CardinalityMetric = col_prof.get_metric("cardinality") + assert cardinality is not None + assert cardinality.estimate == pytest.approx(2, 0.1) + + +def test_cardinality_metric_booleans_all_false() -> None: + df = pl.DataFrame({"b": [False for i in range(3)]}) + schema = DeclarativeSchema( + [ResolverSpec(column_type=AnyType, metrics=[MetricSpec(StandardMetric.cardinality.value)])] + ) + col_prof = why.log(df, schema=schema).view().get_column("b") + cardinality: CardinalityMetric = col_prof.get_metric("cardinality") + assert cardinality.estimate == pytest.approx(1, 0.1) diff --git a/python/tests/core/test_performance_polars.py b/python/tests/core/test_performance_polars.py new file mode 100644 index 0000000000..b0b7715bee --- /dev/null +++ b/python/tests/core/test_performance_polars.py @@ -0,0 +1,218 @@ +import cProfile +import pstats +import random +import sys +from dataclasses import dataclass, field +from io import StringIO +from logging import getLogger +from typing import Any, Dict + +import numpy as np +import pandas as pd +import pytest +import whylogs_sketching as ds # type: ignore + +import whylogs +from whylogs.core import ColumnProfile, ColumnSchema +from whylogs.core.dataset_profile import DatasetProfile +from whylogs.core.metrics.metrics import MetricConfig +from whylogs.core.resolvers import ( + HistogramCountingTrackingResolver, + LimitedTrackingResolver, + Resolver, + StandardResolver, +) +from whylogs.core.stubs import pl + +if sys.version_info < (3, 8): + pytest.skip(allow_module_level=True, reason="Polars requires Python >= 3.8") + + +TEST_LOGGER = getLogger(__name__) + +_TEST_RESOLVERS = [HistogramCountingTrackingResolver(), LimitedTrackingResolver(), StandardResolver()] + + +# TODO: this is from the baseline benchmark, but its not integrated with our metrics +@dataclass +class CustomHistogramMetric: + histogram: ds.kll_floats_sketch = field( + default=ds.kll_floats_sketch(MetricConfig().kll_k), + ) + + def track(self, val: Any) -> "CustomHistogramMetric": + if pd.isna(val): + return self + self.histogram.update(val) + return self + + +@pytest.mark.load +@pytest.mark.parametrize("test_resolver", _TEST_RESOLVERS) +def test_track_column_benchmark(test_resolver: Resolver) -> None: + dataframe_shapes = [(3400000, 43)] # 10x less rows to estimate + TEST_LOGGER.info(f"Running test_track_column_benchmark with {len(dataframe_shapes)} different test dataframes") + for num_rows, num_columns in dataframe_shapes: + TEST_LOGGER.info(f"shape of test dataframe is ({num_rows},{num_columns})...") + + profiler = cProfile.Profile() + string_output_stream = StringIO() + profiler.enable() + for column_index in range(num_columns): + column_name = str(column_index) + col_df = pl.DataFrame({column_name: np.random.random(size=(num_rows,))}) + col_prof = ColumnProfile( + name="perf_test", schema=ColumnSchema(float, resolver=test_resolver), cache_size=1024 + ) + if column_index == 0: + TEST_LOGGER.info( + f"using the following trackers {[metric for metric in col_prof._metrics]} and {col_prof._schema.resolver}" + ) + col_prof.track_column(col_df[column_name]) + profiler.disable() + stats = pstats.Stats(profiler, stream=string_output_stream).sort_stats("cumulative") + stats.print_stats(10) + TEST_LOGGER.info( + f"track_column stats using the following trackers {[metric_name for metric_name in col_prof._metrics]} " + f"are\n{string_output_stream.getvalue()}" + ) + assert col_prof.view().get_metric("distribution") is not None + assert False + + +@pytest.mark.load +def test_track_dataset_benchmark() -> None: + dataframe_shapes = [(120000, 34), (3400000, 43)] # 10x less rows to estimate + TEST_LOGGER.info(f"Running dataset_profile.track with {len(dataframe_shapes)} shapes") + for num_rows, num_columns in dataframe_shapes: + TEST_LOGGER.info(f"shape of test dataframe is ({num_rows},{num_columns})...") + + profiler = cProfile.Profile() + string_output_stream = StringIO() + full_df = pl.DataFrame({str(i): np.random.random(size=(num_rows,)) for i in range(num_columns)}) + dataset_profile = DatasetProfile() + profiler.enable() + dataset_profile.track(full_df) + profiler.disable() + stats = pstats.Stats(profiler, stream=string_output_stream).sort_stats("cumulative") + stats.print_stats(20) + test_column_name = next(iter(dataset_profile._columns)) + TEST_LOGGER.info( + f"dataset_profile.track stats on ({num_rows},{num_columns}) using " + f"{[metric_name for metric_name in dataset_profile._columns[test_column_name]._metrics]} are" + f"\n{string_output_stream.getvalue()}" + ) + for column_name in dataset_profile._columns: + assert dataset_profile._columns[column_name].view().get_metric("distribution") is not None + + +@pytest.mark.load +def test_track_baseline_benchmark() -> None: + # dataframe_shapes = [(1200000, 34), (34000000, 43)] # full baseline + dataframe_shapes = [(120000, 34)] # 10x less rows, first test only to estimate in reasonable time + TEST_LOGGER.info(f"Running custom_metric.track with {len(dataframe_shapes)} different test dataframes") + for num_rows, num_columns in dataframe_shapes: + TEST_LOGGER.info(f"shape of test dataframe is ({num_rows},{num_columns})...") + + profiler = cProfile.Profile() + string_output_stream = StringIO() + baseline_metric = CustomHistogramMetric() + profiler.enable() + for column_index in range(num_columns): + column_name = str(column_index) + baseline_metric = CustomHistogramMetric() + col_df = pl.DataFrame({column_name: np.random.random(size=(num_rows,))}) + if column_index == 0: + TEST_LOGGER.info(f"using the following trackers {baseline_metric}") + for value in col_df[column_name]: + baseline_metric.track(value) + TEST_LOGGER.info(f"\tcolumn: {column_index}") + + profiler.disable() + stats = pstats.Stats(profiler, stream=string_output_stream).sort_stats("cumulative") + stats.print_stats(20) + TEST_LOGGER.info( + f"stats for baseline_benchmark (custom_metric.track) on df({num_rows},{num_columns}) " + f"are\n{string_output_stream.getvalue()}" + ) + + +def _gen_test_row_message(i: int) -> Dict[str, Any]: + additional_fields = 40 + test_message = { + "jobtitle": "software engineer", + "employer": "whylabs", + "city": "seattle", + "state": "washington", + "country": "united states", + "date": "2022-11-02", + "optional_features": i % 100, + "nan_feature": float("nan") if i % 13 == 0 else float(i) / 3.3, + "None_feature": None if i % 3 == 0 else {"a": 1}, + "debug": True, + } + for i in range(additional_fields): + test_message[f"field_{i}"] = random.random() + return test_message + + +def _gen_test_df() -> pl.DataFrame: + num_rows = 1 + num_columns = 50 + full_df = pl.DataFrame({str(i): np.random.random(size=(num_rows,)) for i in range(num_columns)}) + return full_df + + +@pytest.mark.load +def test_rolling_logger_latency_row_benchmark() -> None: + number_of_iterations = 1000 + TEST_LOGGER.info(f"Running latency test with {number_of_iterations} iterations") + test_log = whylogs.logger(mode="rolling", interval=60, when="S", fork=True) + test_log.append_writer("local") + + profiler = cProfile.Profile() + string_output_stream = StringIO() + profiler.enable() + + for i in range(number_of_iterations): + msg = _gen_test_row_message(i) + test_log.log(msg) + + test_log.close() + profiler.disable() + stats = pstats.Stats(profiler, stream=string_output_stream).sort_stats("cumulative") + stats.print_stats(20) + TEST_LOGGER.info(f"stats for rolling latency benchmark are\n{string_output_stream.getvalue()}") + + +@pytest.mark.load +@pytest.mark.parametrize("mode", ["pandas_to_row", "row", "pandas"]) +def test_rolling_logger_latency_benchmark(mode) -> None: + number_of_iterations = 1000 + TEST_LOGGER.info(f"Running latency test with {number_of_iterations} iterations") + test_log = whylogs.logger(mode="rolling", interval=60, when="S", fork=True) + test_log.append_writer("local") + + profiler = cProfile.Profile() + string_output_stream = StringIO() + profiler.enable() + + for i in range(number_of_iterations): + data = None + if mode == "pandas": + data = _gen_test_df() + elif mode == "row": + data = _gen_test_row_message(i) + elif mode == "pandas_to_row": + df = _gen_test_df() + records = df.to_dict(as_series=False) + data = {k: v[0] for k, v in records.items()} + else: + raise ValueError(f"Mode: ({mode}) not supported, must be 'row', 'pandas'...") + test_log.log(data) + + test_log.close() + profiler.disable() + stats = pstats.Stats(profiler, stream=string_output_stream).sort_stats("cumulative") + stats.print_stats(20) + TEST_LOGGER.info(f"stats for rolling latency [{mode}]based benchmark are\n{string_output_stream.getvalue()}") diff --git a/python/tests/experimental/core/test_udf_schema_polars.py b/python/tests/experimental/core/test_udf_schema_polars.py new file mode 100644 index 0000000000..3d89fe43b1 --- /dev/null +++ b/python/tests/experimental/core/test_udf_schema_polars.py @@ -0,0 +1,375 @@ +import sys +from typing import Any, Tuple + +import pytest + +import whylogs as why +from whylogs.core.dataset_profile import DatasetProfile +from whylogs.core.datatypes import Fractional, Integral, String +from whylogs.core.metrics import CardinalityMetric, DistributionMetric, StandardMetric +from whylogs.core.resolvers import STANDARD_RESOLVER, MetricSpec, ResolverSpec +from whylogs.core.segmentation_partition import segment_on_column +from whylogs.core.stubs import is_stub, pl +from whylogs.experimental.core.metrics.udf_metric import register_metric_udf +from whylogs.experimental.core.udf_schema import ( + UdfSchema, + UdfSpec, + register_dataset_udf, + register_multioutput_udf, + register_type_udf, + udf_schema, +) +from whylogs.experimental.core.validators import condition_validator + +if sys.version_info < (3, 8): + pytest.skip(allow_module_level=True, reason="Polars requires Python >= 3.8") + +if is_stub(pl.DataFrame): + pytest.skip(allow_module_level=True, reason="Requires Polars") + + +def test_udf_polars() -> None: + schema = UdfSchema( + STANDARD_RESOLVER, + udf_specs=[UdfSpec(column_names=["col1"], udfs={"col2": lambda x: x[0], "col3": lambda x: x[0]})], + ) + data = pl.DataFrame({"col1": [42, 12, 7]}) + results = why.log(data, schema=schema).view() + col1 = results.get_column("col1").to_summary_dict() + col2 = results.get_column("col2").to_summary_dict() + col3 = results.get_column("col3").to_summary_dict() + assert col1 == col2 == col3 + assert len(data.columns) == 1 + + +@register_multioutput_udf(["xx1", "xx2"], schema_name="polars") +def f1(x) -> pl.DataFrame: + return pl.DataFrame({"foo": x["xx1"], "bar": x["xx2"]}) + + +@register_multioutput_udf(["xx1", "xx2"], prefix="blah", schema_name="polars") +def f2(x) -> pl.DataFrame: + return pl.DataFrame({"foo": x["xx1"], "bar": x["xx2"]}) + + +@register_multioutput_udf(["xx1", "xx2"], no_prefix=True, schema_name="polars") +def no_prefix_udf(x) -> pl.DataFrame: + df = pl.DataFrame({"foo": x["xx1"], "bar": x["xx2"]}) + return df + + +def test_multioutput_udf_dataframe() -> None: + schema = udf_schema(schema_name="polars") + df = pl.DataFrame({"xx1": [42, 7], "xx2": [3.14, 2.72]}) + results = why.log(df, schema=schema).view() + assert results.get_column("f1.foo") is not None + assert results.get_column("f1.bar") is not None + assert results.get_column("blah.foo") is not None + assert results.get_column("blah.bar") is not None + assert results.get_column("foo") is not None + assert results.get_column("bar") is not None + + +def test_drop_columns() -> None: + schema = udf_schema(drop_columns={"xx1", "xx2"}, schema_name="polars") + df = pl.DataFrame({"xx1": [42, 7], "xx2": [3.14, 2.72]}) + results = why.log(df, schema=schema).view() + assert results.get_column("xx1") is None + assert results.get_column("xx2") is None + # UDFs that needed the dropped columns as input still work + assert results.get_column("f1.foo") is not None + assert results.get_column("f1.bar") is not None + assert results.get_column("blah.foo") is not None + assert results.get_column("blah.bar") is not None + assert results.get_column("foo") is not None + assert results.get_column("bar") is not None + + +@register_dataset_udf(["col1"], schema_name="polars-unit-tests") +def add5(x) -> float: + return x[0] + 5 + + +def square(x: Tuple) -> float: + return x[0] * x[0] + + +action_list = [] + + +def do_something_important(validator_name, condition_name: str, value: Any, column_id=None): + print("Validator: {}\n Condition name {} failed for value {}".format(validator_name, condition_name, value)) + action_list.append(value) + if column_id: + # this list is just to verify that the action was called with the correct column id + action_list.append(column_id) + return + + +@condition_validator( + ["col1", "add5"], condition_name="less_than_four", actions=[do_something_important], schema_name="polars" +) +def lt_4(x): + return x < 4 + + +def test_validator_udf_polars() -> None: + global action_list + data = pl.DataFrame({"col1": [1, 3, 7]}) + schema = udf_schema(schema_name="polars", include_default_schema=False) + why.log(data, schema=schema).view() + assert 7 in action_list + + +def test_validator_double_register_udf_polars() -> None: + global action_list + + @condition_validator( + ["col1", "add5"], condition_name="less_than_four", actions=[do_something_important], schema_name="polars" + ) + def lt_4_2(x): + return x < 4 + + schema = udf_schema(schema_name="polars", include_default_schema=False) + # registering the same validator twice should keep only the latest registration + assert schema.validators["col1"][0].conditions["less_than_four"].__name__ == "lt_4_2" + print(f"schema.validators['col1'] = {schema.validators['col1']}") + assert len(schema.validators["col1"]) == 1 + + +def test_decorator_polars() -> None: + extra_spec = UdfSpec(["col1"], {"sqr": square}) + schema = udf_schema([extra_spec], STANDARD_RESOLVER, schema_name="polars-unit-tests") + data = pl.DataFrame({"col1": [42, 12, 7], "col2": ["a", "b", "c"]}) + results = why.log(data, schema=schema).view() + col1_summary = results.get_column("col1").to_summary_dict() + assert "distribution/n" in col1_summary + add5_summary = results.get_column("add5").to_summary_dict() + assert "distribution/n" in add5_summary + sqr_summary = results.get_column("sqr").to_summary_dict() + assert "distribution/n" in sqr_summary + + +@register_dataset_udf( + ["col1"], "annihilate_me", anti_metrics=[CardinalityMetric, DistributionMetric], schema_name="polars-unit-tests" +) +def plus1(x) -> float: + return x[0] + 1 + + +def test_anti_resolver() -> None: + schema = udf_schema(schema_name="polars-unit-tests") + data = pl.DataFrame({"col1": [42, 12, 7], "col2": ["a", "b", "c"]}) + results = why.log(data, schema=schema).view() + col1_summary = results.get_column("col1").to_summary_dict() + assert "distribution/n" in col1_summary + assert "cardinality/est" in col1_summary + col2_summary = results.get_column("col2").to_summary_dict() + assert "distribution/n" in col2_summary + assert "cardinality/est" in col2_summary + add5_summary = results.get_column("add5").to_summary_dict() + assert "distribution/n" in add5_summary + assert "cardinality/est" in add5_summary + plus1_summary = results.get_column("annihilate_me").to_summary_dict() + assert "ints/max" in plus1_summary + assert "distribution/n" not in plus1_summary + assert "cardinality/est" not in plus1_summary + + +@register_dataset_udf(["col1"], "colliding_name", namespace="pluto", schema_name="polars-unit-tests") +def a_function(x): + return x[0] + + +@register_dataset_udf(["col1"], "colliding_name", namespace="neptune", schema_name="polars-unit-tests") +def another_function(x): + return x[0] + + +@register_dataset_udf(["col1", "col2"], "product", schema_name="polars-unit-tests") +def times(x: Tuple) -> float: + return x[0] * x[1] + + +@register_dataset_udf( + ["col1", "col3"], metrics=[MetricSpec(StandardMetric.distribution.value)], schema_name="polars-unit-tests" +) +def ratio(x: Tuple) -> float: + return x[0] / x[1] + + +def test_multicolumn_udf_pandas() -> None: + count_only = [ + ResolverSpec( + column_type=Integral, + metrics=[MetricSpec(StandardMetric.counts.value)], + ), + ResolverSpec( + column_type=Fractional, + metrics=[MetricSpec(StandardMetric.counts.value)], + ), + ResolverSpec( + column_type=String, + metrics=[MetricSpec(StandardMetric.counts.value)], + ), + ] + + extra_spec = UdfSpec(["col1"], {"sqr": square}) + schema = udf_schema([extra_spec], count_only, schema_name="polars-unit-tests") + data = pl.DataFrame({"col1": [42, 12, 7], "col2": [2, 3, 4], "col3": [2, 3, 4]}) + results = why.log(data, schema=schema).view() + col1_summary = results.get_column("col1").to_summary_dict() + assert "counts/n" in col1_summary + col2_summary = results.get_column("col2").to_summary_dict() + assert "counts/n" in col2_summary + col3_summary = results.get_column("col3").to_summary_dict() + assert "counts/n" in col3_summary + add5_summary = results.get_column("add5").to_summary_dict() + assert "counts/n" in add5_summary + prod_summary = results.get_column("product").to_summary_dict() + assert prod_summary["counts/n"] == 3 + sqr_summary = results.get_column("sqr").to_summary_dict() + assert "counts/n" in sqr_summary + div_summary = results.get_column("ratio").to_summary_dict() + assert div_summary["distribution/n"] == 3 + # Integral -> counts plus registered distribution + assert results.get_column("ratio").get_metric("counts") is not None + assert results.get_column("ratio").get_metric("distribution") is not None + + +n: int = 0 + + +@register_dataset_udf(["oops"], schema_name="polars-unit-tests") +def exothermic(x: pl.DataFrame) -> pl.Series: + global n + n += 1 + if n < 3: + raise ValueError("kaboom") + + return x["oops"] + + +def test_udf_throws_polars() -> None: + global n + n = 0 + schema = udf_schema(schema_name="polars-unit-tests") + df = pl.DataFrame({"oops": [1, 2, 3, 4], "ok": [5, 6, 7, 8]}) + results = why.log(df, schema=schema).view() + assert "exothermic" in results.get_columns() + oops_summary = results.get_column("exothermic").to_summary_dict() + assert oops_summary["counts/nan"] > 0 + ok_summary = results.get_column("ok").to_summary_dict() + assert ok_summary["counts/n"] == 4 + + +@register_metric_udf("foo") +def bar(x: Any) -> Any: + return x + + +def test_udf_metric_resolving() -> None: + schema = udf_schema(schema_name="polars-unit-tests") + df = pl.DataFrame({"col1": [1, 2, 3], "foo": [1, 2, 3]}) + results = why.log(df, schema=schema).view() + assert "add5" in results.get_columns() + assert results.get_column("add5").to_summary_dict()["counts/n"] == 3 + assert results.get_column("col1").to_summary_dict()["counts/n"] == 3 + foo_summary = results.get_column("foo").to_summary_dict() + assert "udf/bar:counts/n" in foo_summary + + +def test_udf_segmentation_polars() -> None: + column_segments = segment_on_column("product") + segmented_schema = udf_schema(segments=column_segments, schema_name="polars-unit-tests") + data = pl.DataFrame({"col1": [42, 12, 7], "col2": [2, 3, 4], "col3": [2, 3, 4]}) + results = why.log(data, schema=segmented_schema) + assert len(results.segments()) == 3 + + +def test_udf_segmentation_obj() -> None: + column_segments = segment_on_column("product") + segmented_schema = udf_schema(segments=column_segments, schema_name="polars-unit-tests") + data = {"col1": 42, "col2": 2, "col3": 2} + results = why.log(data, schema=segmented_schema) + assert len(results.segments()) == 1 + + +def test_udf_track() -> None: + schema = udf_schema(schema_name="polars-unit-tests") + prof = DatasetProfile(schema) + data = pl.DataFrame({"col1": [42, 12, 7], "col2": [2, 3, 4], "col3": [2, 3, 4]}) + prof.track(data) + results = prof.view() + col1_summary = results.get_column("col1").to_summary_dict() + assert "counts/n" in col1_summary + col2_summary = results.get_column("col2").to_summary_dict() + assert "counts/n" in col2_summary + col3_summary = results.get_column("col3").to_summary_dict() + assert "counts/n" in col3_summary + add5_summary = results.get_column("add5").to_summary_dict() + assert "counts/n" in add5_summary + prod_summary = results.get_column("product").to_summary_dict() + assert prod_summary["counts/n"] == 3 + div_summary = results.get_column("ratio").to_summary_dict() + assert div_summary["distribution/n"] == 3 + + +@register_dataset_udf(["schema.col1"], schema_name="polars-bob") +def bob(x: pl.DataFrame) -> pl.Series: + return x["schema.col1"] + + +@register_metric_udf("schema.col1", schema_name="polars-bob") +def rob(x: Any) -> Any: + return x + + +@register_dataset_udf(["schema.col1"], "add5", schema_name="polars") +def fob(x: pl.DataFrame) -> pl.Series: + return x["schema.col1"] + 5 + + +def test_direct_udfs() -> None: + schema = udf_schema(schema_name=["polars", "polars-bob"]) + data = pl.DataFrame({"col1": [42, 12, 7]}) + more_data, _ = schema.apply_udfs(dataframe=data) + udf_columns = set(more_data.columns) + + result = why.log(data, schema=schema).view() + profile_columns = set(result.get_columns()) + assert udf_columns == profile_columns + + result = why.log(more_data, schema=schema).view() + more_columns = set(result.get_columns()) + assert more_columns == profile_columns + + +@register_type_udf(Fractional, schema_name="polars-unit-tests") +def square_type(x: pl.Series) -> pl.Series: + return x * x + + +def test_type_udf_dataframe() -> None: + schema = udf_schema(schema_name="polars-unit-tests") + data = pl.DataFrame({"col1": [3.14, 42.0]}) + results = why.log(data, schema=schema).view() + assert "col1.square_type" in results.get_columns().keys() + summary = results.get_column("col1.square_type").to_summary_dict() + assert summary["counts/n"] == 2 + assert summary["types/fractional"] == 2 + + +@register_type_udf(float, schema_name="polars-unit-tests") +def square_python_type(x: pl.Series) -> pl.Series: + return x * x + + +def test_python_type_udf() -> None: + schema = udf_schema(schema_name="polars-unit-tests") + data = pl.DataFrame({"col1": [3.14, 42.0]}) + results = why.log(data, schema=schema).view() + assert "col1.square_python_type" in results.get_columns().keys() + summary = results.get_column("col1.square_python_type").to_summary_dict() + assert summary["counts/n"] == 2 + assert summary["types/fractional"] == 2 diff --git a/python/whylogs/api/logger/__init__.py b/python/whylogs/api/logger/__init__.py index 15f666173c..1376b278ac 100644 --- a/python/whylogs/api/logger/__init__.py +++ b/python/whylogs/api/logger/__init__.py @@ -26,16 +26,17 @@ notebook_session_log_comparison, ) from whylogs.core import DatasetProfile, DatasetSchema +from whylogs.core.dataframe_wrapper import DataFrame, DataFrameWrapper from whylogs.core.metadata import WHYLABS_TRACE_ID_KEY from whylogs.core.model_performance_metrics.model_performance_metrics import ( ModelPerformanceMetrics, ) -from whylogs.core.stubs import pd +from whylogs.core.stubs import pd, pl from whylogs.core.utils import deprecated_argument diagnostic_logger = logging.getLogger(__name__) -Loggable = Union["pd.DataFrame", List[Dict[str, Any]]] +Loggable = Union["pd.DataFrame", "pl.DataFrame", DataFrameWrapper, List[Dict[str, Any]]] @deprecated_argument("debug_event") @@ -43,6 +44,7 @@ def log( obj: Any = None, *, pandas: Optional[pd.DataFrame] = None, + dataframe: Optional[DataFrame] = None, row: Optional[Dict[str, Any]] = None, schema: Optional[DatasetSchema] = None, name: Optional[str] = None, @@ -68,11 +70,18 @@ def log( return result_set else: result_set = TransientLogger(schema=schema).log( - obj, pandas=pandas, row=row, name=name, trace_id=trace_id, tags=tags, segment_key_values=segment_key_values + obj, + pandas=pandas, + dataframe=dataframe, + row=row, + name=name, + trace_id=trace_id, + tags=tags, + segment_key_values=segment_key_values, ) if dataset_timestamp is not None: result_set.set_dataset_timestamp(dataset_timestamp) - notebook_session_log(result_set, obj, pandas=pandas, row=row, name=name) + notebook_session_log(result_set, obj, pandas=pandas, dataframe=dataframe, row=row, name=name) if debug_event is not None: if trace_id is None and WHYLABS_TRACE_ID_KEY in result_set.metadata: diff --git a/python/whylogs/api/logger/experimental/logger/actor/data_logger.py b/python/whylogs/api/logger/experimental/logger/actor/data_logger.py index 11ebec8f10..dc8b923816 100644 --- a/python/whylogs/api/logger/experimental/logger/actor/data_logger.py +++ b/python/whylogs/api/logger/experimental/logger/actor/data_logger.py @@ -1,6 +1,7 @@ from abc import abstractmethod from typing import Any, Dict, Generic, List, Optional, TypeVar, Union +# TODO: stubs? try: import pandas as pd # type: ignore except ImportError: diff --git a/python/whylogs/api/logger/experimental/logger/actor/thread_rolling_logger.py b/python/whylogs/api/logger/experimental/logger/actor/thread_rolling_logger.py index 3b68ce34a7..4565172874 100644 --- a/python/whylogs/api/logger/experimental/logger/actor/thread_rolling_logger.py +++ b/python/whylogs/api/logger/experimental/logger/actor/thread_rolling_logger.py @@ -27,6 +27,7 @@ from whylogs.api.writer import Writer from whylogs.api.writer.writer import Writable from whylogs.core import DatasetProfile, DatasetProfileView, DatasetSchema +from whylogs.core.input_resolver import _dataframe_or_dict # pyright: ignore[reportPrivateUsage,reportUnknownVariableType] from whylogs.core.view.segmented_dataset_profile_view import SegmentedDatasetProfileView try: @@ -70,12 +71,16 @@ def _track_segments(self, data: TrackData) -> None: if self._schema: if isinstance(data, List): - input_data = [self._schema._run_udfs(pandas=None, row=it)[1] for it in data] # type: ignore + input_data = [ + self._schema._run_udfs(df=None, row=it)[1] # pyright: ignore[reportUnknownArgumentType,reportPrivateUsage] + for it in data # pyright: ignore[reportUnknownArgumentType,reportUnknownVariableType,reportPrivateUsage] + ] # pyright: ignore[reportPrivateUsage, reportUnknownMemberType, reportUnknownArgumentType, reportUnknownvariableType] else: df = data if isinstance(data, pd.DataFrame) else None row = data if isinstance(data, dict) else None # pyright: ignore[reportUnknownVariableType] - df, row = self._schema._run_udfs(df, row) # type: ignore - input_data: TrackData = cast(TrackData, df if df is not None else row) + df, row = _dataframe_or_dict(df, None, row) # pyright: ignore[reportUnknownArgumentType] + df, row = self._schema._run_udfs(df, row) # pyright: ignore[reportUnknownVariableType, reportUnknownMemberType, reportPrivateUsage] + input_data: TrackData = cast(TrackData, df if df is not None else row) # type: ignore[no-redef] else: input_data = data diff --git a/python/whylogs/api/logger/logger.py b/python/whylogs/api/logger/logger.py index 52605c581d..b008a695f2 100644 --- a/python/whylogs/api/logger/logger.py +++ b/python/whylogs/api/logger/logger.py @@ -15,8 +15,9 @@ from whylogs.api.store import ProfileStore from whylogs.api.writer import Writer, Writers from whylogs.core import DatasetProfile, DatasetSchema +from whylogs.core.dataframe_wrapper import DataFrame from whylogs.core.errors import LoggingError -from whylogs.core.input_resolver import _pandas_or_dict +from whylogs.core.input_resolver import _dataframe_or_dict from whylogs.core.metadata import ( _populate_common_profile_metadata, _safe_merge_metadata, @@ -70,7 +71,7 @@ def _get_matching_profiles( self, obj: Any = None, *, - pandas: Optional[pd.DataFrame] = None, + dataframe: Optional[DataFrame] = None, row: Optional[Dict[str, Any]] = None, schema: Optional[DatasetSchema] = None, ) -> List[DatasetProfile]: @@ -81,6 +82,7 @@ def log( obj: Any = None, *, pandas: Optional[pd.DataFrame] = None, + dataframe: Optional[DataFrame] = None, row: Optional[Dict[str, Any]] = None, schema: Optional[DatasetSchema] = None, timestamp_ms: Optional[int] = None, # Not the dataset timestamp, but the timestamp of the data @@ -97,7 +99,7 @@ def log( """ if self._is_closed: raise LoggingError("Cannot log to a closed logger") - if obj is None and pandas is None and row is None: + if obj is None and pandas is None and dataframe is None and row is None: # TODO: check for shell environment and emit more verbose error string to let user know how to correct. raise LoggingError("log() was called without passing in any input!") @@ -106,17 +108,17 @@ def log( self._metadata = dict() self._metadata["name"] = name active_schema = schema or self._schema + dataframe, row = _dataframe_or_dict(obj, dataframe if dataframe is not None else pandas, row) if active_schema: - pandas, row = _pandas_or_dict(obj, pandas, row) - obj = None - pandas, row = active_schema._run_udfs(pandas, row) + dataframe, row = active_schema._run_udfs(dataframe, row) # If segments are defined use segment_processing to return a SegmentedResultSet if active_schema and active_schema.segments: segmented_results: SegmentedResultSet = segment_processing( schema=active_schema, - obj=obj, - pandas=pandas, + obj=None, + pandas=None, + dataframe=dataframe, row=row, segment_cache=self._segment_cache, segment_key_values=segment_key_values, @@ -126,10 +128,10 @@ def log( _safe_merge_metadata(default_metadata=segmented_results.metadata, incoming_metadata=active_schema.metadata) return segmented_results - profiles = self._get_matching_profiles(obj, pandas=pandas, row=row, schema=active_schema) + profiles = self._get_matching_profiles(obj, dataframe=dataframe, row=row, schema=active_schema) for prof in profiles: - prof.track(obj, pandas=pandas, row=row, execute_udfs=False) + prof.track(None, dataframe=dataframe, row=row, execute_udfs=False) prof._metadata = _populate_common_profile_metadata(prof._metadata, trace_id=trace_id, tags=tags) if active_schema: _safe_merge_metadata(prof._metadata, active_schema.metadata) diff --git a/python/whylogs/api/logger/rolling.py b/python/whylogs/api/logger/rolling.py index 1906683a5f..4d8f83398e 100644 --- a/python/whylogs/api/logger/rolling.py +++ b/python/whylogs/api/logger/rolling.py @@ -14,7 +14,7 @@ from whylogs.api.logger.segment_cache import SegmentCache from whylogs.api.writer import Writer from whylogs.core import DatasetProfile, DatasetProfileView, DatasetSchema -from whylogs.core.stubs import pd +from whylogs.core.dataframe_wrapper import DataFrame from whylogs.core.view.segmented_dataset_profile_view import SegmentedDatasetProfileView logger = logging.getLogger(__name__) @@ -147,7 +147,7 @@ def _get_matching_profiles( self, obj: Any = None, *, - pandas: Optional[pd.DataFrame] = None, + dataframe: Optional[DataFrame] = None, row: Optional[Dict[str, Any]] = None, schema: Optional[DatasetSchema] = None, ) -> List[DatasetProfile]: diff --git a/python/whylogs/api/logger/segment_processing.py b/python/whylogs/api/logger/segment_processing.py index f4bd30dfbc..5288eaea6e 100644 --- a/python/whylogs/api/logger/segment_processing.py +++ b/python/whylogs/api/logger/segment_processing.py @@ -6,8 +6,9 @@ from whylogs.api.logger.result_set import SegmentedResultSet from whylogs.api.logger.segment_cache import SegmentCache from whylogs.core import DatasetSchema +from whylogs.core.dataframe_wrapper import DataFrame, DataFrameWrapper from whylogs.core.dataset_profile import DatasetProfile -from whylogs.core.input_resolver import _pandas_or_dict +from whylogs.core.input_resolver import _dataframe_or_dict from whylogs.core.segment import Segment from whylogs.core.segmentation_partition import ( ColumnMapperFunction, @@ -35,7 +36,11 @@ def _process_segment( if profile is None: profile = DatasetProfile(schema) - profile.track(segmented_data, execute_udfs=False) + if isinstance(segmented_data, DataFrameWrapper): + profile.track(dataframe=segmented_data, execute_udfs=False) + else: + profile.track(segmented_data, execute_udfs=False) + segments[segment_key] = profile @@ -62,7 +67,7 @@ def _process_simple_partition( schema: DatasetSchema, segments: Dict[Segment, Any], columns: List[str], - pandas: Optional[pd.DataFrame] = None, + dataframe: Optional[DataFrameWrapper] = None, row: Optional[Mapping[str, Any]] = None, segment_cache: Optional[SegmentCache] = None, segment_key_values: Optional[Dict[str, str]] = None, @@ -70,23 +75,22 @@ def _process_simple_partition( explicit_keys = ( tuple(str(segment_key_values[k]) for k in sorted(segment_key_values.keys())) if segment_key_values else tuple() ) - if pandas is not None: - # simple means we can segment on column values - grouped_data = pandas.groupby(columns) - for group in grouped_data.groups.keys(): + if dataframe is not None: + group_keys = dataframe.group_keys(columns) + for group in group_keys: if isinstance(group, tuple) and any([_is_nan(x) for x in group]): evaluations = [] for val, col in zip(group, columns): if _is_nan(val): - evaluations.append((pandas[col].isna())) + evaluations.append(dataframe.get_nan_mask(col)) else: - evaluations.append((pandas[col] == val)) + evaluations.append(dataframe.get_val_mask(col, val)) mask = reduce(lambda x, y: x & y, evaluations) - pandas_segment = pandas[mask] + segment_frame = dataframe.filter(mask) else: - pandas_segment = grouped_data.get_group(group) + segment_frame = dataframe.get_group(columns, group) segment_key = _get_segment_from_group_key(group, partition_id, explicit_keys) - _process_segment(pandas_segment, segment_key, segments, schema, segment_cache) + _process_segment(segment_frame, segment_key, segments, schema, segment_cache) elif row: # TODO: consider if we need to combine with the column names segment_key = Segment(tuple(str(row[element]) for element in columns) + explicit_keys, partition_id) @@ -94,18 +98,18 @@ def _process_simple_partition( def _filter_inputs( - filter: SegmentFilter, pandas: Optional[pd.DataFrame] = None, row: Optional[Mapping[str, Any]] = None + filter: SegmentFilter, dataframe: Optional[DataFrameWrapper] = None, row: Optional[Mapping[str, Any]] = None ) -> Tuple[Optional[pd.DataFrame], Optional[Dict[str, Any]]]: assert ( filter.filter_function or filter.query_string ), f"must define at least a filter function or query string when specifying a segment filter: {filter}" - filtered_pandas = None + filtered_dataframe = None filtered_row = None - if pandas is not None: + if dataframe is not None: if filter.filter_function: - filtered_pandas = pandas[filter.filter_function] + filtered_dataframe = dataframe.filter(filter.filter_function) elif filter.query_string: - filtered_pandas = pandas.query(filter.query_string) + filtered_dataframe = dataframe.query(filter.query_string) elif row is not None: if filter.filter_function: filtered_row = filter.filter_function(row) @@ -113,7 +117,7 @@ def _filter_inputs( raise ValueError( "SegmentFilter query string not supported when logging rows, either don't specify a filter or implement the filter.filter_function" ) - return (filtered_pandas, filtered_row) + return (filtered_dataframe, filtered_row) def _grouped_dataframe(partition: SegmentationPartition, pandas: pd.DataFrame): @@ -134,16 +138,17 @@ def _log_segment( row: Optional[Mapping[str, Any]] = None, segment_cache: Optional[SegmentCache] = None, segment_key_values: Optional[Dict[str, str]] = None, + dataframe: Optional[DataFrame] = None, ) -> Dict[Segment, Any]: segments: Dict[Segment, Any] = {} - pandas, row = _pandas_or_dict(obj, pandas, row) + dataframe, row = _dataframe_or_dict(obj, dataframe if dataframe is not None else pandas, row) if partition.filter: - pandas, row = _filter_inputs(partition.filter, pandas, row) + dataframe, row = _filter_inputs(partition.filter, dataframe, row) if partition.simple: columns = partition.mapper.col_names if partition.mapper else None if columns: _process_simple_partition( - partition.id, schema, segments, columns, pandas, row, segment_cache, segment_key_values + partition.id, schema, segments, columns, dataframe, row, segment_cache, segment_key_values ) else: logger.error( @@ -161,6 +166,7 @@ def segment_processing( row: Optional[Dict[str, Any]] = None, segment_cache: Optional[SegmentCache] = None, segment_key_values: Optional[Dict[str, str]] = None, + dataframe: Optional[DataFrame] = None, ) -> SegmentedResultSet: number_of_partitions = len(schema.segments) logger.info(f"The specified schema defines segments with {number_of_partitions} partitions.") @@ -189,6 +195,7 @@ def segment_processing( schema=schema, obj=obj, pandas=pandas, + dataframe=dataframe, row=row, segment_cache=segment_cache, segment_key_values=segment_key_values, diff --git a/python/whylogs/api/logger/transient.py b/python/whylogs/api/logger/transient.py index 850dbc6687..218b8e1a08 100644 --- a/python/whylogs/api/logger/transient.py +++ b/python/whylogs/api/logger/transient.py @@ -2,7 +2,7 @@ from whylogs.api.logger.logger import Logger from whylogs.core import DatasetProfile, DatasetSchema -from whylogs.core.stubs import pd +from whylogs.core.dataframe_wrapper import DataFrame class TransientLogger(Logger): @@ -13,7 +13,7 @@ def _get_matching_profiles( self, obj: Any = None, *, - pandas: Optional[pd.DataFrame] = None, + dataframe: Optional[DataFrame] = None, row: Optional[Dict[str, Any]] = None, schema: Optional[DatasetSchema] = None, ) -> List[DatasetProfile]: diff --git a/python/whylogs/api/whylabs/session/notebook_logger.py b/python/whylogs/api/whylabs/session/notebook_logger.py index 7ba93ad9db..c32faf4cbf 100644 --- a/python/whylogs/api/whylabs/session/notebook_logger.py +++ b/python/whylogs/api/whylabs/session/notebook_logger.py @@ -6,11 +6,12 @@ from whylogs.api.whylabs.session.session_manager import get_current_session from whylogs.api.whylabs.session.session_types import InteractiveLogger as il from whylogs.api.whylabs.session.session_types import SessionType -from whylogs.core.stubs import pd +from whylogs.core.dataframe_wrapper import DataFrame, DataFrameWrapper +from whylogs.core.stubs import pd, pl def notebook_session_log_comparison( - data: Dict[str, Union["pd.DataFrame", List[Dict[str, Any]]]], result_sets: Dict[str, ResultSet] + data: Dict[str, Union[pd.DataFrame, List[Dict[str, Any]]]], result_sets: Dict[str, ResultSet] ) -> None: session = get_current_session() @@ -47,8 +48,8 @@ def notebook_session_log_comparison( traceback.print_exc() -def _get_loggable_length(loggable: Optional[Union["pd.DataFrame", Dict[str, Any]]]) -> Optional[int]: - if isinstance(loggable, pd.DataFrame): +def _get_loggable_length(loggable: Optional[Union[DataFrame, Dict[str, Any]]]) -> Optional[int]: + if isinstance(loggable, (pd.DataFrame, pl.DataFrame, DataFrameWrapper)): return len(loggable) elif isinstance(loggable, dict): return 1 @@ -61,6 +62,7 @@ def notebook_session_log( obj: Any = None, *, pandas: Optional[pd.DataFrame] = None, + dataframe: Optional[DataFrame] = None, row: Optional[Dict[str, Any]] = None, name: Optional[str] = None, ) -> None: @@ -74,7 +76,12 @@ def notebook_session_log( return # Get the length of whatever was just logged - rows = _get_loggable_length(pandas) or _get_loggable_length(obj) or _get_loggable_length(row) + rows = ( + _get_loggable_length(pandas) + or _get_loggable_length(obj) + or _get_loggable_length(row) + or _get_loggable_length(dataframe) + ) il.message() if rows is not None: diff --git a/python/whylogs/api/whylabs/session/session.py b/python/whylogs/api/whylabs/session/session.py index 42a2b5f884..e34be7fbfe 100644 --- a/python/whylogs/api/whylabs/session/session.py +++ b/python/whylogs/api/whylabs/session/session.py @@ -7,7 +7,7 @@ from functools import partial from typing import Any, Dict, List, Optional, Set, Union, cast -import requests as web_requests +import requests as web_requests # type: ignore from whylabs_client import ApiException # type: ignore from whylabs_client.api.log_api import ( # type: ignore GetProfileObservatoryLinkRequest, diff --git a/python/whylogs/core/constraints/metric_constraints.py b/python/whylogs/core/constraints/metric_constraints.py index 9baa12c193..cd41d9621d 100644 --- a/python/whylogs/core/constraints/metric_constraints.py +++ b/python/whylogs/core/constraints/metric_constraints.py @@ -217,7 +217,7 @@ class DatasetComparisonConstraint: def validate_profile( self, dataset_profile: DatasetProfileView, reference_profile: DatasetProfileView ) -> Tuple[bool, Optional[Dict[str, Any]]]: - validate_result, summary = self.condition(dataset_profile, reference_profile) + (validate_result, summary) = self.condition(dataset_profile, reference_profile) # type: ignore return (validate_result, summary) @@ -245,7 +245,7 @@ def _get_metric_summary(self, metrics: Dict[str, Metric]) -> Optional[Dict[str, def validate_profile(self, dataset_profile: DatasetProfileView) -> Tuple[bool, Optional[Dict[str, Any]]]: try: - validate_result, metrics = self.condition(dataset_profile) + (validate_result, metrics) = self.condition(dataset_profile) # type: ignore except MissingMetric as e: if self.require_column_existence: logger.info(f"validate_profile could not get metric {str(e)} so returning False.") @@ -522,11 +522,15 @@ def validate(self, profile_view: Optional[DatasetProfileView] = None) -> bool: logger.info(f"{constraint_name} failed on column {column_name}") return False - for constraint in self.dataset_constraints + self.dataset_comparison_constraints: - if isinstance(constraint, DatasetConstraint): - (result, _) = constraint.validate_profile(profile) - elif isinstance(constraint, DatasetComparisonConstraint): - (result, _) = constraint.validate_profile(profile, self.reference_profile_view) + constraint: Union[DatasetConstraint, DatasetComparisonConstraint] + for constraint in self.dataset_constraints: + (result, _) = constraint.validate_profile(profile) + if not result: + logger.info(f"{constraint.name} failed on dataset") + return False + + for constraint in self.dataset_comparison_constraints: + (result, _) = constraint.validate_profile(profile, self.reference_profile_view) if not result: logger.info(f"{constraint.name} failed on dataset") return False @@ -605,7 +609,7 @@ def generate_constraints_report( results.append(metric_report) - for constraint in self.dataset_constraints + self.dataset_comparison_constraints: + for constraint in self.dataset_constraints + self.dataset_comparison_constraints: # type: ignore metric_report = self._generate_dataset_report( profile_view=profile, constraint=constraint, diff --git a/python/whylogs/core/dataframe_wrapper.py b/python/whylogs/core/dataframe_wrapper.py new file mode 100644 index 0000000000..68a0e79e80 --- /dev/null +++ b/python/whylogs/core/dataframe_wrapper.py @@ -0,0 +1,168 @@ +from typing import Any, Callable, Dict, List, Optional, Tuple, Union + +from whylogs.core.stubs import pd, pl + +DataFrame = Union[pd.DataFrame, pl.DataFrame, "DataFrameWrapper"] +Series = Union[pd.Series, pl.Series] + + +class DataFrameWrapper: + def __init__(self, dataframe: DataFrame): + # TODO: PandasDataFrame, PolarsDataFrame <: DataFrameWrapper + if isinstance(dataframe, DataFrameWrapper): + pandas, polars = dataframe.pd_df, dataframe.pl_df # type: ignore + elif isinstance(dataframe, pd.DataFrame): + pandas, polars = dataframe, None + else: + pandas, polars = None, dataframe + + self.pd_df = pandas + self.pl_df = polars + + self.column_names = list(pandas.columns) if pandas is not None else polars.columns # type: ignore + self.dtypes = pandas.dtypes if pandas is not None else polars.schema # type: ignore + self.empty = pandas.empty if pandas is not None else len(polars) == 0 # type: ignore + + def len(self) -> int: + return len(self.pd_df if self.pd_df is not None else self.pl_df) + + def __len__(self) -> int: + return len(self.pd_df if self.pd_df is not None else self.pl_df) + + def _update(self) -> None: + self.column_names = list(self.pd_df.columns) if self.pd_df is not None else self.pl_df.columns # type: ignore + self.dtypes = self.pd_df.dtypes if self.pd_df is not None else self.pl_df.schema # type: ignore + self.empty = self.pd_df.empty if self.pd_df is not None else len(self.pl_df) == 0 # type: ignore + + def get(self, column: str) -> Optional[Series]: + if self.pd_df is not None: + return self.pd_df.get(column) + return self.pl_df[column] if column in self.pl_df.schema else None # type: ignore + + def filter(self, filter: Any) -> Optional["DataFrameWrapper"]: + if self.pd_df is not None: + return DataFrameWrapper(self.pd_df[filter]) + if self.pl_df is not None: + return DataFrameWrapper(self.pl_df.filter(filter)) + return None + + def query(self, query: str) -> Optional["DataFrameWrapper"]: + if self.pd_df is not None: + return DataFrameWrapper(self.pd_df.query(query)) + if self.pl_df is not None: + ctx = pl.SQLContext(population=self.pl_df, eager=True) + return DataFrameWrapper(ctx.execute(query)) + return None + + def group_keys(self, columns: List[str]) -> List[Tuple[Any]]: + if self.pd_df is not None: + return self.pd_df.groupby(columns).groups.keys() + elif self.pl_df is not None: + return [x for x, y in self.pl_df.group_by(columns)] + return [] + + def groupby( + self, columns: List[str] + ) -> Any: # Union[pl.dataframe.group_by.GroupBy, pd.core.groupby.generic.DataFrameGroupBy] + if self.pd_df is not None: + grouped = self.pd_df.groupby(columns) + return grouped + d = {g: grouped.get_group(g) for g in grouped.groups.keys()} + return d + elif self.pl_df is not None: + return self.pl_df.group_by(columns) + + def get_nan_mask(self, column: str) -> List[bool]: + if self.pd_df is not None: + return self.pd_df[column].isna() # .to_list() + elif self.pl_df is not None: + return self.pl_df[column].is_nan() # .to_list() + return [] + + def get_val_mask(self, column: str, value: Any) -> List[bool]: + if self.pd_df is not None: + return self.pd_df[column] == value # .to_list() + elif self.pl_df is not None: + return self.pl_df[column] == value # .to_list() + return [] + + def get_group(self, columns: List[str], key: Tuple[Any]) -> Any: + if self.pd_df is not None: + grouped = self.pd_df.groupby(columns) + return grouped.get_group(key) + elif self.pl_df is not None: + grouped = self.pl_df.group_by(columns) + return {k: g for k, g in grouped}[key] + raise ValueError("Cannot group empty DataFrame") + + def concat(self, other: "DataFrameWrapper") -> None: + if self.pd_df is not None: + self.pd_df = pd.concat([self.pd_df, other.pd_df], axis=1) + self._update() + return + elif self.pl_df is not None: + self.pl_df = pl.concat([self.pl_df, other.pl_df], how="horizontal") + self._update() + return + raise ValueError("Cannot concatenate empty DataFrame") + + def drop_columns(self, columns: List[str]) -> None: + if self.pd_df is not None: + self.pd_df = self.pd_df.drop(columns=columns) + self._update() + return + elif self.pl_df is not None: + self.pl_df = self.pl_df.drop(columns) + self._update() + return + raise ValueError("Cannot drop columns from empty DataFrame") + + def __getitem__(self, key: str) -> "DataFrameWrapper": + if self.pd_df is not None: + return DataFrameWrapper(pd.DataFrame(self.pd_df[key])) + elif self.pl_df is not None: + return DataFrameWrapper(pl.DataFrame(self.pl_df[key])) + raise ValueError("Cannot index empty DataFrame") + + def __setitem__(self, key: str, value: Union[pd.Series, pl.Series]) -> None: + if self.pd_df is not None: + self.pd_df[key] = value + self._update() + return + elif self.pl_df is not None: + self.pl_df = self.pl_df.with_columns(value.alias(key)) + self._update() + return + raise ValueError("Cannot index empty DataFrame") + + def apply_udf(self, udf: Callable) -> Series: + if self.pd_df is not None: + return pd.Series(udf(self.pd_df)) + elif self.pl_df is not None: + return self.pl_df.map_rows(udf)["map"] + raise ValueError("Cannot apply UDFs to empty DataFrame") + + def apply_type_udf(self, udf: Callable) -> Series: + if self.pd_df is not None: + return pd.Series(udf(self.pd_df[self.pd_df.columns[0]])) + elif self.pl_df is not None: + return pl.Series(self.pl_df[self.pl_df.columns[0]].map_elements(udf)) + raise ValueError("Cannot apply UDFs to empty DataFrame") + + def apply_multicolumn_udf(self, udf: Callable) -> "DataFrameWrapper": + if self.pd_df is not None: + return DataFrameWrapper(udf(self.pd_df)) + elif self.pl_df is not None: + return DataFrameWrapper(udf(self.pl_df)) + raise ValueError("Cannot apply UDFs to empty DataFrame") + + def rename(self, columns: Dict[str, str]) -> None: + if self.pd_df is not None: + self.pd_df = self.pd_df.rename(columns=columns) + self._update() + return + elif self.pl_df is not None: + self.pl_df = self.pl_df.rename(columns) + self._update() + return + raise ValueError("Cannot rename an empty DataFrame") diff --git a/python/whylogs/core/dataset_profile.py b/python/whylogs/core/dataset_profile.py index ccf609cd64..ec9d1d2ed9 100644 --- a/python/whylogs/core/dataset_profile.py +++ b/python/whylogs/core/dataset_profile.py @@ -12,7 +12,8 @@ from whylogs.core.utils.utils import deprecated, deprecated_alias, ensure_timezone from .column_profile import ColumnProfile -from .input_resolver import _pandas_or_dict +from .dataframe_wrapper import DataFrame, DataFrameWrapper +from .input_resolver import _dataframe_or_dict from .schema import DatasetSchema from .stubs import pd from .view import DatasetProfileView @@ -109,32 +110,33 @@ def track( obj: Any = None, *, pandas: Optional[pd.DataFrame] = None, + dataframe: Optional[DataFrame] = None, row: Optional[Mapping[str, Any]] = None, execute_udfs: bool = True, ) -> None: + dataframe, row = _dataframe_or_dict(obj, dataframe if dataframe is not None else pandas, row) + try: self._is_active = True self._track_count += 1 - self._do_track(obj, pandas=pandas, row=row, execute_udfs=execute_udfs) + self._do_track(dataframe=dataframe, row=row, execute_udfs=execute_udfs) finally: self._is_active = False def _do_track( self, - obj: Any = None, *, - pandas: Optional[pd.DataFrame] = None, + dataframe: Optional[DataFrameWrapper] = None, row: Optional[Mapping[str, Any]] = None, execute_udfs: bool = True, ) -> None: - pandas, row = _pandas_or_dict(obj, pandas, row) if execute_udfs: - pandas, row = self._schema._run_udfs(pandas, row) + dataframe, row = self._schema._run_udfs(dataframe, row) col_id: Optional[str] = getattr(self._schema.default_configs, "identity_column", None) # TODO: do this less frequently when operating at row level - dirty = self._schema.resolve(pandas=pandas, row=row) + dirty = self._schema.resolve(dataframe=dataframe, row=row) if dirty: schema_col_keys = self._schema.get_col_names() new_cols = (col for col in schema_col_keys if col not in self._columns) @@ -146,23 +148,24 @@ def _do_track( self._columns[k]._track_datum(row[k], row_id) return - elif pandas is not None: + elif dataframe is not None: # TODO: iterating over each column in order assumes single column metrics # but if we instead iterate over a new artifact contained in dataset profile: "MetricProfiles", then # each metric profile can specify which columns its tracks, and we can call like this: # metric_profile.track(pandas) - if pandas.empty: - logger.warning("whylogs was passed an empty pandas DataFrame so nothing to profile in this call.") + if dataframe.empty: + logger.warning("whylogs was passed an empty DataFrame so nothing to profile in this call.") return - for k in pandas.keys(): - column_values = pandas.get(k) + for k in dataframe.column_names: + column_values = dataframe.get(k) if column_values is None: logger.error( - f"whylogs was passed a pandas DataFrame with key [{k}] but DataFrame.get({k}) returned nothing!" + f"whylogs was passed a DataFrame with key [{k}] but DataFrame.get({k}) returned nothing!" ) return dtype = self._schema.types.get(k) + # TODO: support Polars homogeneous columns? homogeneous = ( dtype is not None and isinstance(dtype, tuple) @@ -171,7 +174,7 @@ def _do_track( and dtype[1] == ColumnProperties.homogeneous ) - id_values = pandas.get(col_id) if col_id else None + id_values = dataframe.get(col_id) if col_id else None if col_id is not None and id_values is None: logger.warning(f"identity column was passed as {col_id} but column was not found in the dataframe.") diff --git a/python/whylogs/core/datatypes.py b/python/whylogs/core/datatypes.py index 9941043e10..5755be771a 100644 --- a/python/whylogs/core/datatypes.py +++ b/python/whylogs/core/datatypes.py @@ -2,7 +2,7 @@ from decimal import Decimal from typing import Any, Generic, List, Optional, Type, TypeVar, Union -from whylogs.core.stubs import is_not_stub, np +from whylogs.core.stubs import is_not_stub, np, pl try: from pandas.core.api import CategoricalDtype @@ -51,10 +51,13 @@ def _do_match(cls, dtype_or_type: Any, maybe_type: Optional[Any]) -> bool: if maybe_type: dtype_or_type = maybe_type # type: ignore + if issubclass(type(dtype_or_type), pl.datatypes.IntegerType): + return True + if not isinstance(dtype_or_type, type): return False - if issubclass(dtype_or_type, (bool, int, np.number, np.bool_)): + if issubclass(dtype_or_type, (bool, int, np.number, np.bool_, pl.datatypes.IntegerType)): if is_not_stub(np.issubdtype) and np.issubdtype(dtype_or_type, np.floating): return False if issubclass(dtype_or_type, (np.datetime64, np.timedelta64)): @@ -73,6 +76,9 @@ def _do_match(cls, dtype_or_type: Any, maybe_type: Optional[Any]) -> bool: if maybe_type: dtype_or_type = maybe_type + if issubclass(type(dtype_or_type), (pl.Float32, pl.Float64)): + return True + if not isinstance(dtype_or_type, type): return False @@ -85,6 +91,9 @@ def __init__(self) -> None: @classmethod def _do_match(cls, dtype_or_type: Any, maybe_type: Optional[Any]) -> bool: + if issubclass(type(dtype_or_type), (pl.String, pl.Utf8)): + return True + # Pandas Categorical is Strings if CategoricalDtype is not None and isinstance(dtype_or_type, CategoricalDtype): return True @@ -101,7 +110,7 @@ def _do_match(cls, dtype_or_type: Any, maybe_type: Optional[Any]) -> bool: if not isinstance(dtype_or_type, type): return False - if issubclass(dtype_or_type, (str, np.unicode_)): + if issubclass(dtype_or_type, (str, np.unicode_, pl.String, pl.Utf8)): return True return False diff --git a/python/whylogs/core/input_resolver.py b/python/whylogs/core/input_resolver.py index d6ace01d44..b5c22a0270 100644 --- a/python/whylogs/core/input_resolver.py +++ b/python/whylogs/core/input_resolver.py @@ -1,23 +1,29 @@ from typing import Any, Dict, Mapping, Optional, Tuple -from whylogs.core.stubs import pd +from whylogs.core.dataframe_wrapper import DataFrame, DataFrameWrapper +from whylogs.core.stubs import pd, pl -def _pandas_or_dict( - obj: Any, pandas: Optional[pd.DataFrame] = None, row: Optional[Mapping[str, Any]] = None -) -> Tuple[Optional[pd.DataFrame], Optional[Mapping[str, Any]]]: +def _dataframe_or_dict( + obj: Any, + dataframe: Optional[DataFrame] = None, + row: Optional[Mapping[str, Any]] = None, +) -> Tuple[Optional[DataFrameWrapper], Optional[Mapping[str, Any]]]: if obj is not None: - if pandas is not None: - raise ValueError("Cannot pass both obj and pandas params") + if dataframe is not None: + raise ValueError("Cannot pass both obj and dataframe params") if row is not None: raise ValueError("Cannot pass both obj and row params") if isinstance(obj, (dict, Dict, Mapping)): - row = obj - elif pd.DataFrame is not None and isinstance(obj, pd.DataFrame): - pandas = obj + return (None, obj) + elif isinstance(obj, DataFrameWrapper): + return (obj, None) + elif isinstance(obj, (pd.DataFrame, pl.DataFrame)): + return (DataFrameWrapper(obj), None) - if pandas is not None and row is not None: - raise ValueError("Cannot pass both pandas and row params") + if dataframe is not None and row is not None: + raise ValueError("Cannot pass both dataframe and row params") - return (pandas, row) + df = DataFrameWrapper(dataframe) if dataframe is not None else None + return (df, row) diff --git a/python/whylogs/core/metrics/condition_count_metric.py b/python/whylogs/core/metrics/condition_count_metric.py index 8d3bf49a12..61f4c8d684 100644 --- a/python/whylogs/core/metrics/condition_count_metric.py +++ b/python/whylogs/core/metrics/condition_count_metric.py @@ -200,7 +200,7 @@ def columnar_update(self, data: PreprocessedColumn) -> OperationResult: count += 1 for cond_name, condition in self.conditions.items(): try: - if condition.relation(datum): + if condition.relation(datum): # type: ignore self.matches[cond_name].set(self.matches[cond_name].value + 1) else: if condition.log_on_failure: diff --git a/python/whylogs/core/metrics/metrics.py b/python/whylogs/core/metrics/metrics.py index 2d103c53d3..b4343bde0e 100644 --- a/python/whylogs/core/metrics/metrics.py +++ b/python/whylogs/core/metrics/metrics.py @@ -7,7 +7,7 @@ from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union import whylogs_sketching as ds # type: ignore -from google.protobuf.struct_pb2 import Struct +from google.protobuf.struct_pb2 import Struct # type: ignore import whylogs.core.configs as conf from whylogs.core.metrics.maths import ( diff --git a/python/whylogs/core/metrics/unicode_range.py b/python/whylogs/core/metrics/unicode_range.py index 49302dccf3..f819fb8658 100644 --- a/python/whylogs/core/metrics/unicode_range.py +++ b/python/whylogs/core/metrics/unicode_range.py @@ -79,6 +79,7 @@ def columnar_update(self, view: PreprocessedColumn) -> OperationResult: view.pandas.strings.to_list() if view.pandas.strings is not None and not view.pandas.strings.empty else [] ) data = (data + view.list.strings) if view.list.strings else data + data = (data + view.numpy.strings.tolist()) if view.numpy.strings is not None else data range_data: Dict[str, List[int]] = {range_name: [] for range_name in self.range_definitions.keys()} lengths: List[int] = [] for value in data: diff --git a/python/whylogs/core/preprocessing.py b/python/whylogs/core/preprocessing.py index 02defa287d..9ca11b0e65 100644 --- a/python/whylogs/core/preprocessing.py +++ b/python/whylogs/core/preprocessing.py @@ -6,7 +6,7 @@ from math import isinf, isnan from typing import Any, Iterable, Iterator, List, Optional, Tuple, Union -from whylogs.core.stubs import is_not_stub, np, pd +from whylogs.core.stubs import is_not_stub, np, pd, pl logger = logging.getLogger("whylogs.core.views") @@ -169,9 +169,7 @@ def _pandas_split(self, series: pd.Series, parse_numeric_string: bool = False) - bool_mask_where_true = non_null_series.apply(lambda x: pdc.is_bool(x) and x) int_mask = non_null_series.apply(lambda x: pdc.is_number(x) and pdc.is_integer(x) and not pdc.is_bool(x)) str_mask = non_null_series.apply(lambda x: isinstance(x, str)) - tensor_mask = non_null_series.apply( - lambda x: isinstance(x, (list, np.ndarray)) and PreprocessedColumn._is_tensorable(x) - ) + tensor_mask = non_null_series.apply(lambda x: isinstance(x, (list, np.ndarray)) and _is_tensorable(x)) floats = non_null_series[float_mask] if non_null_series[int_mask].empty: @@ -199,6 +197,54 @@ def _pandas_split(self, series: pd.Series, parse_numeric_string: bool = False) - self.bool_count = bool_count self.bool_count_where_true = bool_count_where_true + def _polars_split(self, series: pl.Series, parse_numeric_string: bool = False) -> None: + """ + Split a Polars Series into numpy array and other Polars series. + + Args: + series: the original Pandas series + parse_numeric_string: if set, this will coerce values into integer using pands.to_numeric() method. + + Returns: + SplitSeries with multiple values, including numpy arrays for numbers, and strings as a Polars Series. + """ + + # TODO: add a PolarsView, or convert PandasView to work with both Polars & Pandas + + if series is None: + return None + if pl.Series is None: + return None + + non_null_series = series.drop_nulls() + if non_null_series.len() < 1: + return + + self.null_count = series.null_count() + if series.dtype.is_numeric(): + if series.dtype.is_float(): + non_nan_series = non_null_series.drop_nans() + self.nan_count = non_null_series.len() - non_nan_series.len() + self.inf_count = non_null_series.is_infinite().sum() + self.numpy.floats = non_null_series.to_numpy() + return + else: + self.numpy.ints = non_null_series.to_numpy() + return + + if series.dtype in {pl.String, pl.Categorical, pl.Enum, pl.Utf8}: + self.numpy.strings = non_null_series.to_numpy() + return + + # TODO: tensor support + + if series.dtype == pl.Boolean: + self.bool_count = non_null_series.len() + self.bool_count_where_true = non_null_series.sum() + return + + self.list.objs = non_null_series.to_list() + def raw_iterator(self) -> Iterator[Any]: iterables = [ *self.numpy.iterables(), @@ -234,9 +280,9 @@ def _process_scalar_value(value: Any) -> "PreprocessedColumn": float_list.append(value) elif isinstance(value, str): string_list.append(value) - elif isinstance(value, list) and PreprocessedColumn._is_tensorable(value): + elif isinstance(value, list) and _is_tensorable(value): tensor_list.append(np.asarray(value)) - elif is_not_stub(np.ndarray) and PreprocessedColumn._is_tensorable(value): + elif is_not_stub(np.ndarray) and _is_tensorable(value): tensor_list.append(value) elif value is not None: obj_list.append(value) @@ -292,7 +338,7 @@ def _process_homogeneous_column(series: pd.Series) -> "PreprocessedColumn": result.bool_count = series.count() result.bool_count_where_true = series[bool_mask_where_true].count() return result - elif isinstance(value, (list, np.ndarray)) and PreprocessedColumn._is_tensorable(value): + elif isinstance(value, (list, np.ndarray)) and _is_tensorable(value): if isinstance(value, np.ndarray): result.pandas.tensors = series else: @@ -312,6 +358,11 @@ def apply(data: Any) -> "PreprocessedColumn": result.len = len(data) return result + if pl.Series is not None and isinstance(data, pl.Series): + result._polars_split(data) + result.len = len(data) + return result + if isinstance(data, np.ndarray): result.len = len(data) if issubclass(data.dtype.type, (np.number, np.str_)): @@ -343,9 +394,9 @@ def apply(data: Any) -> "PreprocessedColumn": float_list.append(x) elif isinstance(x, str): string_list.append(x) - elif isinstance(x, list) and PreprocessedColumn._is_tensorable(x): + elif isinstance(x, list) and _is_tensorable(x): tensor_list.append(np.asarray(x)) - elif isinstance(x, np.ndarray) and PreprocessedColumn._is_tensorable(x): + elif isinstance(x, np.ndarray) and _is_tensorable(x): tensor_list.append(x) elif x is not None: obj_list.append(x) @@ -378,14 +429,14 @@ def apply(data: Any) -> "PreprocessedColumn": list_format = [data] return PreprocessedColumn.apply(list_format) - @staticmethod - def _is_tensorable(value: Union[np.ndarray, List[Any]]) -> bool: - if not is_not_stub(np.ndarray): - return False - - maybe_tensor = value if isinstance(value, np.ndarray) else np.asarray(value) - return ( - len(maybe_tensor.shape) > 0 - and all([i > 0 for i in maybe_tensor.shape]) - and np.issubdtype(maybe_tensor.dtype, np.number) - ) + +def _is_tensorable(value: Union[np.ndarray, List[Any]]) -> bool: + if not is_not_stub(np.ndarray): + return False + + maybe_tensor = value if isinstance(value, np.ndarray) else np.asarray(value) + return ( + len(maybe_tensor.shape) > 0 + and all([i > 0 for i in maybe_tensor.shape]) + and np.issubdtype(maybe_tensor.dtype, np.number) + ) diff --git a/python/whylogs/core/schema.py b/python/whylogs/core/schema.py index 7cef397bd9..1e3bd199db 100644 --- a/python/whylogs/core/schema.py +++ b/python/whylogs/core/schema.py @@ -4,6 +4,7 @@ from typing import Any, Dict, List, Mapping, Optional, Set, Tuple, TypeVar, Union import whylogs.core.resolvers as res +from whylogs.core.dataframe_wrapper import DataFrame, DataFrameWrapper from whylogs.core.datatypes import StandardTypeMapper, TypeMapper from whylogs.core.metrics.metrics import Metric, MetricConfig from whylogs.core.resolvers import ( @@ -131,10 +132,13 @@ def resolve( self, *, pandas: Optional[pd.DataFrame] = None, + dataframe: Optional[DataFrame] = None, row: Optional[Mapping[str, Any]] = None, ) -> bool: + if dataframe is not None: + return self._resolve_dataframe(DataFrameWrapper(dataframe)) if pandas is not None: - return self._resolve_pdf(pandas) + return self._resolve_dataframe(DataFrameWrapper(pandas)) if row is not None: for k, v in row.items(): @@ -151,12 +155,12 @@ def resolve( return True raise NotImplementedError - def _resolve_pdf(self, df: pd.DataFrame, force_resolve: bool = False) -> bool: + def _resolve_dataframe(self, df: DataFrameWrapper, force_resolve: bool = False) -> bool: """ Resolve ColumnSchema from the dataframe. We only resolve newly detected columns unless `force_resolve` is set to True. """ - col_names = df.dtypes.keys() + col_names = df.column_names dirty = False for col_name in col_names: if not force_resolve and col_name in self._columns: @@ -176,9 +180,9 @@ def _resolve_pdf(self, df: pd.DataFrame, force_resolve: bool = False) -> bool: return dirty def _run_udfs( - self, pandas: Optional[pd.DataFrame] = None, row: Optional[Mapping[str, Any]] = None - ) -> Tuple[Optional[pd.DataFrame], Optional[Mapping[str, Any]]]: - return pandas, row + self, df: Optional[DataFrameWrapper] = None, row: Optional[Mapping[str, Any]] = None + ) -> Tuple[Optional[DataFrameWrapper], Optional[Mapping[str, Any]]]: + return df, row def get_col_names(self) -> tuple: return tuple(self._columns.keys()) diff --git a/python/whylogs/core/stubs.py b/python/whylogs/core/stubs.py index 629807cd3f..5ae3f34e63 100644 --- a/python/whylogs/core/stubs.py +++ b/python/whylogs/core/stubs.py @@ -9,6 +9,11 @@ except ImportError: # noqa _pd = None # type: ignore +try: + import polars as _pl +except ImportError: # noqa + _pl = None # type: ignore + try: import numpy as _np except ImportError: # noqa @@ -55,6 +60,25 @@ class PandasStub(object): DataFrame: type = _StubClass +@dataclass(frozen=True) +class _PolarsTypeStub: + IntegerType: type = _StubClass + + +@dataclass(frozen=True) +class PolarsStub(object): + Series: type = _StubClass + DataFrame: type = _StubClass + # TODO: support more Polars types? + datatypes: type = _PolarsTypeStub + Int32: type = _StubClass + Int64: type = _StubClass + Float32: type = _StubClass + Float64: type = _StubClass + String: type = _StubClass + Utf8: type = _StubClass + + @dataclass(frozen=True) class ScipyStub: sparse: type = _StubClass @@ -72,18 +96,25 @@ def is_not_stub(stubbed_class: Any) -> bool: if ( stubbed_class and stubbed_class is not _StubClass - and not isinstance(stubbed_class, (PandasStub, NumpyStub, ScipyStub, ScikitLearnStub)) + and not isinstance(stubbed_class, (PandasStub, PolarsStub, NumpyStub, ScipyStub, ScikitLearnStub)) ): return True return False +def is_stub(stubbed_class: Any) -> bool: + return not is_not_stub(stubbed_class) + + if _np is None: _np = NumpyStub() if _pd is None: _pd = PandasStub() +if _pl is None: + _pl = PolarsStub() + if _sp is None: _sp = ScipyStub() @@ -99,6 +130,7 @@ def is_not_stub(stubbed_class: Any) -> bool: np = _np pd = _pd +pl = _pl sp = _sp sklp = _sklp sklc = _sklc diff --git a/python/whylogs/core/utils/protobuf_utils.py b/python/whylogs/core/utils/protobuf_utils.py index a3745c52da..7f92498e23 100644 --- a/python/whylogs/core/utils/protobuf_utils.py +++ b/python/whylogs/core/utils/protobuf_utils.py @@ -6,7 +6,7 @@ from logging import getLogger from typing import IO, Type, TypeVar -from google.protobuf.message import Message +from google.protobuf.message import Message # type: ignore from whylogs.core.errors import DeserializationError diff --git a/python/whylogs/core/validators/validator.py b/python/whylogs/core/validators/validator.py index 06a71cd7c9..87a33408b5 100644 --- a/python/whylogs/core/validators/validator.py +++ b/python/whylogs/core/validators/validator.py @@ -6,7 +6,7 @@ from whylogs.core.metrics.condition_count_metric import Condition -@dataclass +@dataclass # type: ignore class Validator(ABC): name: str conditions: Dict[str, Union[Condition, Callable[[Any], bool]]] diff --git a/python/whylogs/core/view/dataset_profile_view.py b/python/whylogs/core/view/dataset_profile_view.py index 686e6f75f0..f3f1a358cc 100644 --- a/python/whylogs/core/view/dataset_profile_view.py +++ b/python/whylogs/core/view/dataset_profile_view.py @@ -6,7 +6,7 @@ from enum import Enum from typing import Any, BinaryIO, Dict, List, Optional, Tuple, Union -from google.protobuf.message import DecodeError +from google.protobuf.message import DecodeError # type: ignore from whylogs.api.writer.writer import _Writable from whylogs.core.configs import SummaryConfig diff --git a/python/whylogs/experimental/core/udf_schema.py b/python/whylogs/experimental/core/udf_schema.py index 4cc612d45a..7992b19696 100644 --- a/python/whylogs/experimental/core/udf_schema.py +++ b/python/whylogs/experimental/core/udf_schema.py @@ -16,12 +16,13 @@ Union, ) +from whylogs.core.dataframe_wrapper import DataFrame, DataFrameWrapper from whylogs.core.datatypes import DataType, StandardTypeMapper, TypeMapper from whylogs.core.metrics.metrics import Metric, MetricConfig from whylogs.core.resolvers import NO_FI_RESOLVER, MetricSpec, ResolverSpec from whylogs.core.schema import DeclarativeSchema from whylogs.core.segmentation_partition import SegmentationPartition -from whylogs.core.stubs import pd +from whylogs.core.stubs import pd, pl from whylogs.core.validators.validator import Validator from whylogs.experimental.core.metrics.udf_metric import ( _reset_metric_udfs, @@ -109,7 +110,7 @@ def _apply_udf_on_row( def _apply_udfs_on_dataframe( - pandas: pd.DataFrame, udfs: Dict, new_df: pd.DataFrame, input_cols: Collection[str] + df: DataFrameWrapper, udfs: Dict, new_df: DataFrameWrapper, input_cols: Collection[str] ) -> None: """multiple input columns, single output column""" for new_col, udf in udfs.items(): @@ -117,23 +118,24 @@ def _apply_udfs_on_dataframe( continue try: - new_df[new_col] = pd.Series(udf(pandas)) + tmp = df.apply_udf(udf) + new_df[new_col] = tmp except Exception as e: # noqa - new_df[new_col] = pd.Series([None]) - logger.exception(f"Evaluating UDF {new_col} failed on columns {pandas.keys()} with error {e}") + new_df[new_col] = df.apply_udf(lambda x: float("nan")) # should be None, but can't infer type + logger.exception(f"Evaluating UDF {new_col} failed on columns {df.column_names} with error {e}") def _apply_udf_on_dataframe( name: str, prefix: Optional[str], - pandas: pd.DataFrame, + df: DataFrameWrapper, udf: Callable, - new_df: pd.DataFrame, + new_df: DataFrameWrapper, input_cols: Collection[str], ) -> None: """ multiple input columns, multiple output columns - udf(Union[Dict[str, List], pd.DataFrame]) -> Union[Dict[str, List], pd.DataFrame] + udf(Union[Dict[str, List], pd.DataFrame, pl.DataFrame]) -> Union[Dict[str, List], pd.DataFrame, pl.DataFrame] """ def add_prefix(col): @@ -141,24 +143,23 @@ def add_prefix(col): try: # TODO: I think it's OKAY if udf returns a dictionary - udf_output = pd.DataFrame(udf(pandas)) - udf_output = udf_output.rename(columns={old: add_prefix(old) for old in udf_output.keys()}) - for new_col in udf_output.keys(): - new_df[new_col] = udf_output[new_col] + udf_output = df.apply_multicolumn_udf(udf) # pd.DataFrame(udf(pandas)) + udf_output.rename(columns={old: add_prefix(old) for old in udf_output.column_names}) + new_df.concat(udf_output) except Exception as e: # noqa - logger.exception(f"Evaluating UDF {name} failed on columns {pandas.keys()} with error {e}") + logger.exception(f"Evaluating UDF {name} failed on columns {df.column_names} with error {e}") return pd.DataFrame() -def _apply_type_udfs(pandas: pd.Series, udfs: Dict, new_df: pd.DataFrame, input_cols: Collection[str]) -> None: +def _apply_type_udfs(df: DataFrameWrapper, udfs: Dict, new_df: pd.DataFrame, input_cols: Collection[str]) -> None: for new_col, udf in udfs.items(): if new_col in input_cols: continue try: - new_df[new_col] = pd.Series(udf(pandas)) + new_df[new_col] = df.apply_type_udf(udf) except Exception as e: # noqa - new_df[new_col] = pd.Series([None]) + new_df[new_col] = df.apply_udf(lambda x: float("nan")) # should be None, but can't infer type logger.exception(f"Evaluating UDF {new_col} failed on column {new_col} with error {e}") @@ -222,45 +223,62 @@ def _run_udfs_on_row( udfs = {f"{column}.{key}": spec.udfs[key] for key in spec.udfs.keys()} _apply_udfs_on_row([value], udfs, new_columns, input_cols) - def _run_udfs_on_dataframe(self, pandas: pd.DataFrame, new_df: pd.DataFrame, input_cols: Collection[str]) -> None: + def _run_udfs_on_dataframe( + self, df: DataFrameWrapper, new_df: DataFrameWrapper, input_cols: Collection[str] + ) -> None: for spec in self.multicolumn_udfs: - if spec.column_names and set(spec.column_names).issubset(set(pandas.keys())): + if spec.column_names and set(spec.column_names).issubset(set(df.column_names)): if spec.udf is not None: _apply_udf_on_dataframe( - spec.name, spec.prefix, pandas[spec.column_names], spec.udf, new_df, input_cols # type: ignore + spec.name, spec.prefix, df[spec.column_names], spec.udf, new_df, input_cols # type: ignore ) else: - _apply_udfs_on_dataframe(pandas[spec.column_names], spec.udfs, new_df, input_cols) + _apply_udfs_on_dataframe(df[spec.column_names], spec.udfs, new_df, input_cols) - for column, dtype in pandas.dtypes.items(): + for column, dtype in df.dtypes.items(): why_type = type(self.type_mapper(dtype)) for spec in self.type_udfs[why_type]: udfs = {f"{column}.{key}": spec.udfs[key] for key in spec.udfs.keys()} - _apply_type_udfs(pandas[column], udfs, new_df, input_cols) + _apply_type_udfs(df[column], udfs, new_df, input_cols) def _run_udfs( - self, pandas: Optional[pd.DataFrame] = None, row: Optional[Dict[str, Any]] = None - ) -> Tuple[Optional[pd.DataFrame], Optional[Mapping[str, Any]]]: + self, df: Optional[DataFrameWrapper] = None, row: Optional[Dict[str, Any]] = None + ) -> Tuple[Optional[DataFrameWrapper], Optional[Dict[str, Any]]]: new_columns = deepcopy(row) if row else None - new_df = pd.DataFrame() + if df: + new_df = DataFrameWrapper(pd.DataFrame()) if df.pd_df is not None else DataFrameWrapper(pl.DataFrame()) + else: + new_df = None + if row is not None: self._run_udfs_on_row(row, new_columns, row.keys()) # type: ignore if self.drop_columns: for col in set(row.keys()).intersection(self.drop_columns): row.pop(col) - if pandas is not None: - self._run_udfs_on_dataframe(pandas, new_df, pandas.keys()) - new_df = pd.concat([pandas, new_df], axis=1) + if df is not None: + self._run_udfs_on_dataframe(df, new_df, df.column_names) + df.concat(new_df) if self.drop_columns: - new_df = new_df.drop(columns=list(set(new_df.keys()).intersection(self.drop_columns))) - - return new_df if pandas is not None else None, new_columns + df.drop_columns(columns=list(set(df.column_names).intersection(self.drop_columns))) + return df if df is not None else None, new_columns def apply_udfs( - self, pandas: Optional[pd.DataFrame] = None, row: Optional[Dict[str, Any]] = None - ) -> Tuple[Optional[pd.DataFrame], Optional[Mapping[str, Any]]]: - return self._run_udfs(pandas, row) + self, + pandas: Optional[pd.DataFrame] = None, + row: Optional[Dict[str, Any]] = None, + dataframe: Optional[DataFrame] = None, + ) -> Tuple[Optional[DataFrame], Optional[Mapping[str, Any]]]: + df = ( + DataFrameWrapper(dataframe if dataframe is not None else pandas) + if (pandas is not None or dataframe is not None) + else None + ) + df, row = self._run_udfs(df, row) + if df is not None: + df = df.pd_df if df.pd_df is not None else df.pl_df + + return df, row _multicolumn_udfs: Dict[str, List[UdfSpec]] = defaultdict(list) @@ -480,7 +498,7 @@ def udf_schema( for name in schema_names: resolver_specs += _resolver_specs[name] - validators = generate_validators(validators, name, include_default_schema=True) + validators = generate_validators(validators, name, include_default_schema=include_default_schema) resolver_specs += generate_udf_resolvers(schema_name, include_default_schema) return UdfSchema(