Skip to content

Commit

Permalink
[WIP] Compatibility with Polars V1 (#233)
Browse files Browse the repository at this point in the history
* Make regex strings raw strings

* Remove True from string cache call

* Utilise non-strict dataframe init to make tests more meaningful

* Use non-strict in dataframe init also

* Names in group_by iteration now returns a tuple

* Update named gropuby keys, non-strict inits

* partition_by now also returns tuple as keys

* Add basic sanity test for model_summary (it failed)

* suffix is now in `name` namespace

* Remove assert

* Add pandoc version check, so we get a warning when not installed

* Add testbook as explicit test optional dependency

* Set min polars version to 1

* bump version

* Remove True from string cache call

* Utilise non-strict dataframe init to make tests more meaningful

* Use non-strict in dataframe init also

* Names in group_by iteration now returns a tuple

* Update named gropuby keys, non-strict inits

* partition_by now also returns tuple as keys

* Add basic sanity test for model_summary (it failed)

* suffix is now in `name` namespace

* Remove assert

* Add pandoc version check, so we get a warning when not installed

* Add testbook as explicit test optional dependency

* Set min polars version to 1

* expr.cut update

* add version info to logs and fix quarto process

* Revert "add version info to logs and fix quarto process"

This reverts commit 2c153db.

* update polars to 1.7

* Bump polars version

* Drop support for python 3.8

* Run tests on python 3.13

* allow rc for 3.13

* try drop patch version

* Dropping 3.13 again as it's clearly not supported in GH actions yet. Will create separate PR

* Bump version for minor release

* Bump version for minor release

---------

Co-authored-by: Uyanik, Yusuf <[email protected]>
  • Loading branch information
StijnKas and yusufuyanik1 authored Oct 8, 2024
1 parent 6677ddf commit 7c91518
Show file tree
Hide file tree
Showing 18 changed files with 776 additions and 105 deletions.
19 changes: 14 additions & 5 deletions .github/workflows/Python tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,29 @@ jobs:

name: pdstools (Python ${{ matrix.python-version }}) on ${{ matrix.os }}


strategy:
fail-fast: true
matrix:
os: [ubuntu-latest, windows-latest, macOS-latest]
python-version: [3.8.x, 3.9.x, 3.11.x, 3.12.x]
python-version: [3.9.x, 3.10.x, 3.11.x, 3.12.x]
exclude:
- os: windows-latest
python-version: 3.8.x
python-version: 3.9.x
- os: windows-latest
python-version: 3.10.x
- os: windows-latest
python-version: 3.11.x
# - os: windows-latest
# python-version: 3.12.x
- os: macOS-latest
python-version: 3.9.x
- os: macOS-latest
python-version: 3.8.x
python-version: 3.10.x
- os: macOS-latest
python-version: 3.9.x

python-version: 3.11.x
# - os: macOS-latest
# python-version: 3.12.x
env:
GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }}

Expand All @@ -42,6 +50,7 @@ jobs:
uses: actions/setup-python@v4
with:
python-version: ${{matrix.python-version}}
allow-prereleases: true

- name: Upgrade pip
run: |
Expand Down
6 changes: 3 additions & 3 deletions examples/articles/thompsonsampling.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@
"\n",
"# Convergence of the Thompson Sampled propensities\n",
"s = thompsonSamplingSimulation['positives']\n",
"thompsonSamplingSimulation2 = thompsonSamplingSimulation.hstack(s.cut(breaks=np.array(range(int(s.min()), int(s.max())+20, 20))-1, series=False).select(bin='category'))\n",
"thompsonSamplingSimulation2 = thompsonSamplingSimulation.hstack([s.cut(breaks=np.array(range(int(s.min()), int(s.max())+20, 20))-1).rename(\"bin\")])\n",
"s = thompsonSamplingSimulation2.group_by(\"p\", \"bin\").agg(\n",
" n=pl.count(),\n",
" n=pl.len(),\n",
" n90=(((pl.col(\"sampled_propensity\") - pl.col(\"p\")) / pl.col(\"p\")) < 0.1).sum(),\n",
" positives=pl.min(\"positives\"),\n",
").with_columns(pct=pl.col('n90')/pl.col('n')).sort('p', 'bin').with_columns(pl.col('p').cast(pl.Utf8).cast(pl.Categorical))\n",
Expand Down Expand Up @@ -194,7 +194,7 @@
" pl.lit(1.0),\n",
" (\n",
" pl.lit(ncust)\n",
" / pl.col(\"impressions\").where(pl.col(\"week\") == w).sum()\n",
" / pl.col(\"impressions\").filter(pl.col(\"week\") == w).sum()\n",
" ),\n",
" ]\n",
" )\n",
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies = {file = ["python/requirements.txt"]}
[project.optional-dependencies]
docs = ['sphinx','furo','sphinx-autoapi','nbsphinx','sphinx-copybutton','myst-parser']
app = ['streamlit>=1.23', 'quarto', 'papermill', 'itables', 'jinja2>=3.1', 'xlsxwriter>=3.0', 'tabulate', 'st-pages']
tests = ['testbook']

[project.urls]
"Homepage" = "https://github.com/pegasystems/pega-datascientist-tools"
Expand Down
32 changes: 30 additions & 2 deletions python/pdstools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Pega Data Scientist Tools Python library"""

__version__ = "3.4.8-beta"
__version__ = "3.5.0"

from polars import enable_string_cache

Expand All @@ -26,6 +26,34 @@
from .valuefinder.ValueFinder import ValueFinder

if "streamlit" in sys.modules:
from .utils import streamlit_utils
from .utils import streamlit_utils as streamlit_utils

__reports__ = Path(__file__).parents[0] / "reports"

__all__ = [
"ADMDatamart",
"ADMTrees",
"MultiTrees",
"BinAggregator",
"DecisionData",
"API",
"S3",
"Anonymization",
"File",
"get_token",
"readDSExport",
"setupAzureOpenAI",
"Prediction",
"NBAD",
"cdh_utils",
"datasets",
"errors",
"defaultPredictorCategorization",
"CDHLimits",
"CDHSample",
"SampleTrees",
"SampleValueFinder",
"show_versions",
"PegaDefaultTables",
"ValueFinder",
]
96 changes: 57 additions & 39 deletions python/pdstools/adm/ADMDatamart.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import polars as pl
import yaml
from polars.exceptions import ColumnNotFoundError

from .. import pega_io
from ..plots.plot_base import Plots
Expand Down Expand Up @@ -173,7 +174,9 @@ def __init__(
)
if self.modelData is not None:
missing_context_keys = [
key for key in self.context_keys if key not in self.modelData.columns
key
for key in self.context_keys
if key not in self.modelData.collect_schema().names()
]
if missing_context_keys:
self.modelData = self.modelData.with_columns(
Expand Down Expand Up @@ -307,7 +310,8 @@ def import_data(
**reading_opts,
)
if df2 is not None:
if "BinResponseCount" not in df2.columns: # pragma: no cover
df2_columns = df2.collect_schema().names()
if "BinResponseCount" not in df2_columns: # pragma: no cover
df2 = df2.with_columns(
(pl.col("BinPositives") + pl.col("BinNegatives")).alias(
"BinResponseCount"
Expand All @@ -323,7 +327,7 @@ def import_data(

if (
self.predictorCategorization is not None
and "PredictorCategory" not in df2.columns
and "PredictorCategory" not in df2_columns
):
if not isinstance(self.predictorCategorization, pl.Expr):
self.predictorCategorization = self.predictorCategorization()
Expand All @@ -332,7 +336,9 @@ def import_data(
if df1 is not None and df2 is not None:
total_missing = (
set(self.missing_model)
& set(self.missing_preds) - set(df1.columns) - set(df2.columns)
& set(self.missing_preds)
- set(df1.collect_schema().names())
- set(df2_columns)
) - {"Treatment"}
if len(total_missing) > 0 and verbose: # pragma: no cover
print(
Expand Down Expand Up @@ -436,7 +442,7 @@ def _import_utils(
df = self._apply_query(df, self.query)
if self.verbose:
print(f"Query successful for {name}.")
except pl.ColumnNotFoundError:
except ColumnNotFoundError:
if self.verbose:
print(
f"""Query unsuccessful for {name}.
Expand Down Expand Up @@ -498,7 +504,9 @@ def _available_columns(
"GroupIndex",
} # NOTE: these default names are already capitalized properly, with py/px/pz removed.

rename = {i: "Name" for i in df.columns if i.lower() == "modelname"}
rename = {
i: "Name" for i in df.collect_schema().names() if i.lower() == "modelname"
}
if len(rename) > 0:
df = df.rename(rename)

Expand All @@ -510,8 +518,10 @@ def _available_columns(
)

include_cols = default_names.union(include_cols).difference(drop_cols)
missing = {col for col in include_cols if col not in df.columns}
to_import = include_cols.intersection(set(df.columns))
missing = {
col for col in include_cols if col not in df.collect_schema().names()
}
to_import = include_cols.intersection(set(df.collect_schema().names()))

return to_import, missing

Expand Down Expand Up @@ -553,7 +563,7 @@ def _set_types(
timestamp_fmt=timestamp_fmt,
strict_conversion=strict_conversion,
)
if "SnapshotTime" not in df.columns:
if "SnapshotTime" not in df.collect_schema().names():
df = df.with_columns(SnapshotTime=None)
return df

Expand Down Expand Up @@ -592,7 +602,7 @@ def last(
@staticmethod
def _last(df: any_frame) -> any_frame:
"""Method to retrieve only the last snapshot."""
if df.select("SnapshotTime").dtypes[0] == pl.datatypes.Null:
if df.select("SnapshotTime").collect_schema().dtypes()[0] == pl.datatypes.Null:
return df

return df.filter(
Expand Down Expand Up @@ -740,24 +750,27 @@ def _apply_query(
Filtered Polars DataFrame
"""
if isinstance(df, pl.DataFrame):
df_cols = df.columns
df = df.lazy()
else:
df_cols = df.collect_schema().names()
if query is not None:
if isinstance(query, pl.Expr):
col_diff = set(query.meta.root_names()) - set(df.columns)
col_diff = set(query.meta.root_names()) - set(df_cols)
if len(col_diff) == 0:
return df.filter(query)

else:
raise pl.ColumnNotFoundError(col_diff)
raise ColumnNotFoundError(col_diff)

if isinstance(query, list):
for item in query:
if isinstance(item, pl.Expr):
col_diff = set(item.meta.root_names()) - set(df.columns)
col_diff = set(item.meta.root_names()) - set(df_cols)
if len(col_diff) == 0:
df = df.filter(item)
else:
raise pl.ColumnNotFoundError(col_diff)
raise ColumnNotFoundError(col_diff)
else:
raise ValueError(item)
return df
Expand Down Expand Up @@ -809,7 +822,7 @@ def discover_modelTypes(
"""
if self.import_strategy != "eager" and allow_collect == False:
raise NotEagerError("Discovering AGB models")
if "Modeldata" not in df.columns:
if "Modeldata" not in df.collect_schema().names():
raise ValueError(
(
"Modeldata column not in the data. "
Expand Down Expand Up @@ -949,15 +962,15 @@ def _create_sign_df(
.alias("Daily_increase")
.over("ModelID")
)
.group_by_dynamic("SnapshotTime", every=every, by=by)
.group_by_dynamic("SnapshotTime", every=every, group_by=by)
.agg(pl.sum("Daily_increase").alias("Increase"))
)
if pivot:
df = (
df.collect()
.pivot(
index="SnapshotTime",
columns=by,
on=by,
values="Increase",
aggregate_function="first",
)
Expand Down Expand Up @@ -998,16 +1011,18 @@ def model_summary(
required_columns = set(aggcols).union({by})

context_keys = kwargs.get("context_keys", self.context_keys)
assert required_columns.issubset(set(data.columns) | set(context_keys))
assert required_columns.issubset(
set(data.collect_schema().names()) | set(context_keys)
)

return (
data.group_by(context_keys)
.agg(
[
pl.count(by).suffix("_count"),
pl.col([aggcols[0], aggcols[3]]).sum().suffix("_sum"),
pl.col(aggcols).max().suffix("_max"),
pl.col(aggcols).mean().suffix("_mean"),
pl.count(by).name.suffix("_count"),
pl.col([aggcols[0], aggcols[3]]).sum().name.suffix("_sum"),
pl.col(aggcols).max().name.suffix("_max"),
pl.col(aggcols).mean().name.suffix("_mean"),
(pl.col("ResponseCount") == 0)
.sum()
.alias("Count_without_responses"),
Expand Down Expand Up @@ -1080,7 +1095,7 @@ def pivot_df(
df.collect()
.pivot(
index=by,
columns="PredictorName",
on="PredictorName",
values="PerformanceBin",
aggregate_function="first",
)
Expand Down Expand Up @@ -1155,24 +1170,20 @@ def models_by_positives_df(

modelsByPositives = df.select([by, "Positives", "ModelID"]).collect()
return (
modelsByPositives.join(
modelsByPositives["Positives"].cut(
modelsByPositives.with_columns(
PositivesBin=modelsByPositives["Positives"].cut(
breaks=list(range(0, 210, 10)),
series=False,
category_label="PositivesBin",
),
on="Positives",
how="left",
)
)
.lazy()
.group_by([by, "PositivesBin", "break_point"])
.group_by([by, "PositivesBin"])
.agg([pl.min("Positives"), pl.n_unique("ModelID").alias("ModelCount")])
.with_columns(
(pl.col("ModelCount") / (pl.sum("ModelCount").over(by))).alias(
"cumModels"
)
)
.sort("break_point")
.sort("Positives")
)

def get_model_stats(self, last: bool = True) -> dict:
Expand Down Expand Up @@ -1332,7 +1343,13 @@ def summary_by_channel(

# Removes whitespace and capitalizes names for matching
def name_normalizer(x):
return pl.col(x).str.replace_all(r"[ \-_]", "").str.to_uppercase()
return (
pl.col(x)
.cast(pl.Utf8)
.str.replace_all(r"[ \-_]", "")
.str.to_uppercase()
.cast(pl.Categorical)
)

directionMapping = pl.DataFrame(
# Standard directions have a 1:1 mapping to channel groups
Expand Down Expand Up @@ -1378,37 +1395,38 @@ def name_normalizer(x):
# all these expressions needed because not every customer has Treatments and
# polars can't aggregate literals, so we have to be careful to pass on explicit
# values when there are no treatments
model_data_cols = self.modelData.collect_schema().names()
treatmentIdentifierExpr = (
pl.concat_str(["Issue", "Group", "Name", "Treatment"], separator="/")
if "Treatment" in self.modelData.columns
if "Treatment" in model_data_cols
else pl.lit("")
)
activeTreatmentExpr = (
(
(pl.col("ResponseCount").sum() > 0)
& (pl.col("Treatment").is_not_null())
).over(["Issue", "Group", "Name", "Treatment"])
if "Treatment" in self.modelData.columns
if "Treatment" in model_data_cols
else pl.lit(False)
)
uniqueTreatmentExpr = (
treatmentIdentifierExpr.unique()
if "Treatment" in self.modelData.columns
if "Treatment" in model_data_cols
else pl.lit([])
)
uniqueTreatmentCountExpr = (
treatmentIdentifierExpr.n_unique()
if "Treatment" in self.modelData.columns
if "Treatment" in model_data_cols
else pl.lit(0)
)
uniqueUsedTreatmentExpr = (
treatmentIdentifierExpr.filter(pl.col("isUsedTreatment")).unique()
if "Treatment" in self.modelData.columns
if "Treatment" in model_data_cols
else pl.lit([])
)
uniqueUsedTreatmentCountExpr = (
treatmentIdentifierExpr.filter(pl.col("isUsedTreatment")).n_unique()
if "Treatment" in self.modelData.columns
if "Treatment" in model_data_cols
else pl.lit(0)
)

Expand Down
Loading

0 comments on commit 7c91518

Please sign in to comment.