diff --git a/.github/workflows/Python tests.yml b/.github/workflows/Python tests.yml index a1d16b71..9817506c 100644 --- a/.github/workflows/Python tests.yml +++ b/.github/workflows/Python tests.yml @@ -17,21 +17,29 @@ jobs: name: pdstools (Python ${{ matrix.python-version }}) on ${{ matrix.os }} + strategy: fail-fast: true matrix: os: [ubuntu-latest, windows-latest, macOS-latest] - python-version: [3.8.x, 3.9.x, 3.11.x, 3.12.x] + python-version: [3.9.x, 3.10.x, 3.11.x, 3.12.x] exclude: - os: windows-latest - python-version: 3.8.x + python-version: 3.9.x + - os: windows-latest + python-version: 3.10.x - os: windows-latest + python-version: 3.11.x + # - os: windows-latest + # python-version: 3.12.x + - os: macOS-latest python-version: 3.9.x - os: macOS-latest - python-version: 3.8.x + python-version: 3.10.x - os: macOS-latest - python-version: 3.9.x - + python-version: 3.11.x + # - os: macOS-latest + # python-version: 3.12.x env: GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }} @@ -42,6 +50,7 @@ jobs: uses: actions/setup-python@v4 with: python-version: ${{matrix.python-version}} + allow-prereleases: true - name: Upgrade pip run: | diff --git a/examples/articles/thompsonsampling.ipynb b/examples/articles/thompsonsampling.ipynb index 474958f2..d794d46c 100644 --- a/examples/articles/thompsonsampling.ipynb +++ b/examples/articles/thompsonsampling.ipynb @@ -79,9 +79,9 @@ "\n", "# Convergence of the Thompson Sampled propensities\n", "s = thompsonSamplingSimulation['positives']\n", - "thompsonSamplingSimulation2 = thompsonSamplingSimulation.hstack(s.cut(breaks=np.array(range(int(s.min()), int(s.max())+20, 20))-1, series=False).select(bin='category'))\n", + "thompsonSamplingSimulation2 = thompsonSamplingSimulation.hstack([s.cut(breaks=np.array(range(int(s.min()), int(s.max())+20, 20))-1).rename(\"bin\")])\n", "s = thompsonSamplingSimulation2.group_by(\"p\", \"bin\").agg(\n", - " n=pl.count(),\n", + " n=pl.len(),\n", " n90=(((pl.col(\"sampled_propensity\") - pl.col(\"p\")) / pl.col(\"p\")) < 0.1).sum(),\n", " positives=pl.min(\"positives\"),\n", ").with_columns(pct=pl.col('n90')/pl.col('n')).sort('p', 'bin').with_columns(pl.col('p').cast(pl.Utf8).cast(pl.Categorical))\n", @@ -194,7 +194,7 @@ " pl.lit(1.0),\n", " (\n", " pl.lit(ncust)\n", - " / pl.col(\"impressions\").where(pl.col(\"week\") == w).sum()\n", + " / pl.col(\"impressions\").filter(pl.col(\"week\") == w).sum()\n", " ),\n", " ]\n", " )\n", diff --git a/pyproject.toml b/pyproject.toml index ddd5b938..3f81337d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ dependencies = {file = ["python/requirements.txt"]} [project.optional-dependencies] docs = ['sphinx','furo','sphinx-autoapi','nbsphinx','sphinx-copybutton','myst-parser'] app = ['streamlit>=1.23', 'quarto', 'papermill', 'itables', 'jinja2>=3.1', 'xlsxwriter>=3.0', 'tabulate', 'st-pages'] +tests = ['testbook'] [project.urls] "Homepage" = "https://github.com/pegasystems/pega-datascientist-tools" diff --git a/python/pdstools/__init__.py b/python/pdstools/__init__.py index 1c0d1c70..7dd63b18 100644 --- a/python/pdstools/__init__.py +++ b/python/pdstools/__init__.py @@ -1,6 +1,6 @@ """Pega Data Scientist Tools Python library""" -__version__ = "3.4.8-beta" +__version__ = "3.5.0" from polars import enable_string_cache @@ -26,6 +26,34 @@ from .valuefinder.ValueFinder import ValueFinder if "streamlit" in sys.modules: - from .utils import streamlit_utils + from .utils import streamlit_utils as streamlit_utils __reports__ = Path(__file__).parents[0] / "reports" + +__all__ = [ + "ADMDatamart", + "ADMTrees", + "MultiTrees", + "BinAggregator", + "DecisionData", + "API", + "S3", + "Anonymization", + "File", + "get_token", + "readDSExport", + "setupAzureOpenAI", + "Prediction", + "NBAD", + "cdh_utils", + "datasets", + "errors", + "defaultPredictorCategorization", + "CDHLimits", + "CDHSample", + "SampleTrees", + "SampleValueFinder", + "show_versions", + "PegaDefaultTables", + "ValueFinder", +] diff --git a/python/pdstools/adm/ADMDatamart.py b/python/pdstools/adm/ADMDatamart.py index 8cd9ae06..3ff15dde 100644 --- a/python/pdstools/adm/ADMDatamart.py +++ b/python/pdstools/adm/ADMDatamart.py @@ -12,6 +12,7 @@ import polars as pl import yaml +from polars.exceptions import ColumnNotFoundError from .. import pega_io from ..plots.plot_base import Plots @@ -173,7 +174,9 @@ def __init__( ) if self.modelData is not None: missing_context_keys = [ - key for key in self.context_keys if key not in self.modelData.columns + key + for key in self.context_keys + if key not in self.modelData.collect_schema().names() ] if missing_context_keys: self.modelData = self.modelData.with_columns( @@ -307,7 +310,8 @@ def import_data( **reading_opts, ) if df2 is not None: - if "BinResponseCount" not in df2.columns: # pragma: no cover + df2_columns = df2.collect_schema().names() + if "BinResponseCount" not in df2_columns: # pragma: no cover df2 = df2.with_columns( (pl.col("BinPositives") + pl.col("BinNegatives")).alias( "BinResponseCount" @@ -323,7 +327,7 @@ def import_data( if ( self.predictorCategorization is not None - and "PredictorCategory" not in df2.columns + and "PredictorCategory" not in df2_columns ): if not isinstance(self.predictorCategorization, pl.Expr): self.predictorCategorization = self.predictorCategorization() @@ -332,7 +336,9 @@ def import_data( if df1 is not None and df2 is not None: total_missing = ( set(self.missing_model) - & set(self.missing_preds) - set(df1.columns) - set(df2.columns) + & set(self.missing_preds) + - set(df1.collect_schema().names()) + - set(df2_columns) ) - {"Treatment"} if len(total_missing) > 0 and verbose: # pragma: no cover print( @@ -436,7 +442,7 @@ def _import_utils( df = self._apply_query(df, self.query) if self.verbose: print(f"Query successful for {name}.") - except pl.ColumnNotFoundError: + except ColumnNotFoundError: if self.verbose: print( f"""Query unsuccessful for {name}. @@ -498,7 +504,9 @@ def _available_columns( "GroupIndex", } # NOTE: these default names are already capitalized properly, with py/px/pz removed. - rename = {i: "Name" for i in df.columns if i.lower() == "modelname"} + rename = { + i: "Name" for i in df.collect_schema().names() if i.lower() == "modelname" + } if len(rename) > 0: df = df.rename(rename) @@ -510,8 +518,10 @@ def _available_columns( ) include_cols = default_names.union(include_cols).difference(drop_cols) - missing = {col for col in include_cols if col not in df.columns} - to_import = include_cols.intersection(set(df.columns)) + missing = { + col for col in include_cols if col not in df.collect_schema().names() + } + to_import = include_cols.intersection(set(df.collect_schema().names())) return to_import, missing @@ -553,7 +563,7 @@ def _set_types( timestamp_fmt=timestamp_fmt, strict_conversion=strict_conversion, ) - if "SnapshotTime" not in df.columns: + if "SnapshotTime" not in df.collect_schema().names(): df = df.with_columns(SnapshotTime=None) return df @@ -592,7 +602,7 @@ def last( @staticmethod def _last(df: any_frame) -> any_frame: """Method to retrieve only the last snapshot.""" - if df.select("SnapshotTime").dtypes[0] == pl.datatypes.Null: + if df.select("SnapshotTime").collect_schema().dtypes()[0] == pl.datatypes.Null: return df return df.filter( @@ -740,24 +750,27 @@ def _apply_query( Filtered Polars DataFrame """ if isinstance(df, pl.DataFrame): + df_cols = df.columns df = df.lazy() + else: + df_cols = df.collect_schema().names() if query is not None: if isinstance(query, pl.Expr): - col_diff = set(query.meta.root_names()) - set(df.columns) + col_diff = set(query.meta.root_names()) - set(df_cols) if len(col_diff) == 0: return df.filter(query) else: - raise pl.ColumnNotFoundError(col_diff) + raise ColumnNotFoundError(col_diff) if isinstance(query, list): for item in query: if isinstance(item, pl.Expr): - col_diff = set(item.meta.root_names()) - set(df.columns) + col_diff = set(item.meta.root_names()) - set(df_cols) if len(col_diff) == 0: df = df.filter(item) else: - raise pl.ColumnNotFoundError(col_diff) + raise ColumnNotFoundError(col_diff) else: raise ValueError(item) return df @@ -809,7 +822,7 @@ def discover_modelTypes( """ if self.import_strategy != "eager" and allow_collect == False: raise NotEagerError("Discovering AGB models") - if "Modeldata" not in df.columns: + if "Modeldata" not in df.collect_schema().names(): raise ValueError( ( "Modeldata column not in the data. " @@ -949,7 +962,7 @@ def _create_sign_df( .alias("Daily_increase") .over("ModelID") ) - .group_by_dynamic("SnapshotTime", every=every, by=by) + .group_by_dynamic("SnapshotTime", every=every, group_by=by) .agg(pl.sum("Daily_increase").alias("Increase")) ) if pivot: @@ -957,7 +970,7 @@ def _create_sign_df( df.collect() .pivot( index="SnapshotTime", - columns=by, + on=by, values="Increase", aggregate_function="first", ) @@ -998,16 +1011,18 @@ def model_summary( required_columns = set(aggcols).union({by}) context_keys = kwargs.get("context_keys", self.context_keys) - assert required_columns.issubset(set(data.columns) | set(context_keys)) + assert required_columns.issubset( + set(data.collect_schema().names()) | set(context_keys) + ) return ( data.group_by(context_keys) .agg( [ - pl.count(by).suffix("_count"), - pl.col([aggcols[0], aggcols[3]]).sum().suffix("_sum"), - pl.col(aggcols).max().suffix("_max"), - pl.col(aggcols).mean().suffix("_mean"), + pl.count(by).name.suffix("_count"), + pl.col([aggcols[0], aggcols[3]]).sum().name.suffix("_sum"), + pl.col(aggcols).max().name.suffix("_max"), + pl.col(aggcols).mean().name.suffix("_mean"), (pl.col("ResponseCount") == 0) .sum() .alias("Count_without_responses"), @@ -1080,7 +1095,7 @@ def pivot_df( df.collect() .pivot( index=by, - columns="PredictorName", + on="PredictorName", values="PerformanceBin", aggregate_function="first", ) @@ -1155,24 +1170,20 @@ def models_by_positives_df( modelsByPositives = df.select([by, "Positives", "ModelID"]).collect() return ( - modelsByPositives.join( - modelsByPositives["Positives"].cut( + modelsByPositives.with_columns( + PositivesBin=modelsByPositives["Positives"].cut( breaks=list(range(0, 210, 10)), - series=False, - category_label="PositivesBin", - ), - on="Positives", - how="left", + ) ) .lazy() - .group_by([by, "PositivesBin", "break_point"]) + .group_by([by, "PositivesBin"]) .agg([pl.min("Positives"), pl.n_unique("ModelID").alias("ModelCount")]) .with_columns( (pl.col("ModelCount") / (pl.sum("ModelCount").over(by))).alias( "cumModels" ) ) - .sort("break_point") + .sort("Positives") ) def get_model_stats(self, last: bool = True) -> dict: @@ -1332,7 +1343,13 @@ def summary_by_channel( # Removes whitespace and capitalizes names for matching def name_normalizer(x): - return pl.col(x).str.replace_all(r"[ \-_]", "").str.to_uppercase() + return ( + pl.col(x) + .cast(pl.Utf8) + .str.replace_all(r"[ \-_]", "") + .str.to_uppercase() + .cast(pl.Categorical) + ) directionMapping = pl.DataFrame( # Standard directions have a 1:1 mapping to channel groups @@ -1378,9 +1395,10 @@ def name_normalizer(x): # all these expressions needed because not every customer has Treatments and # polars can't aggregate literals, so we have to be careful to pass on explicit # values when there are no treatments + model_data_cols = self.modelData.collect_schema().names() treatmentIdentifierExpr = ( pl.concat_str(["Issue", "Group", "Name", "Treatment"], separator="/") - if "Treatment" in self.modelData.columns + if "Treatment" in model_data_cols else pl.lit("") ) activeTreatmentExpr = ( @@ -1388,27 +1406,27 @@ def name_normalizer(x): (pl.col("ResponseCount").sum() > 0) & (pl.col("Treatment").is_not_null()) ).over(["Issue", "Group", "Name", "Treatment"]) - if "Treatment" in self.modelData.columns + if "Treatment" in model_data_cols else pl.lit(False) ) uniqueTreatmentExpr = ( treatmentIdentifierExpr.unique() - if "Treatment" in self.modelData.columns + if "Treatment" in model_data_cols else pl.lit([]) ) uniqueTreatmentCountExpr = ( treatmentIdentifierExpr.n_unique() - if "Treatment" in self.modelData.columns + if "Treatment" in model_data_cols else pl.lit(0) ) uniqueUsedTreatmentExpr = ( treatmentIdentifierExpr.filter(pl.col("isUsedTreatment")).unique() - if "Treatment" in self.modelData.columns + if "Treatment" in model_data_cols else pl.lit([]) ) uniqueUsedTreatmentCountExpr = ( treatmentIdentifierExpr.filter(pl.col("isUsedTreatment")).n_unique() - if "Treatment" in self.modelData.columns + if "Treatment" in model_data_cols else pl.lit(0) ) diff --git a/python/pdstools/adm/ADMTrees.py b/python/pdstools/adm/ADMTrees.py index f0cce425..9ebfacf1 100644 --- a/python/pdstools/adm/ADMTrees.py +++ b/python/pdstools/adm/ADMTrees.py @@ -472,7 +472,9 @@ def getGainsPerSplit(self) -> Tuple[Dict, pl.DataFrame, dict]: total_split_list = functools.reduce(operator.iconcat, splitlist, []) total_gains_list = functools.reduce(operator.iconcat, gainslist, []) gainsPerSplit = pl.DataFrame( - list(zip(total_split_list, total_gains_list)), schema=["split", "gains"] + list(zip(total_split_list, total_gains_list)), + schema=["split", "gains"], + orient="row", ) gainsPerSplit = gainsPerSplit.with_columns( predictor=pl.col("split").map_elements( @@ -552,7 +554,7 @@ def plotSplitsPerVariable(self, subset: Optional[Set] = None, show=True): plt.figure """ figlist = [] - for name, data in self.gainsPerSplit.group_by("predictor"): + for (name,), data in self.gainsPerSplit.group_by("predictor"): if (subset is not None and name in subset) or subset is None: fig = make_subplots() fig.add_trace( @@ -617,7 +619,7 @@ def getTreeStats(self) -> pl.DataFrame: def getAllValuesPerSplit(self) -> Dict: """Generate a dictionary with the possible values for each split""" splitvalues = {} - for name, group in self.groupedGainsPerSplit.group_by("predictor"): + for (name,), group in self.groupedGainsPerSplit.group_by("predictor"): if name not in splitvalues.keys(): splitvalues[name] = set() splitvalue = group.get_column("values").to_list() diff --git a/python/pdstools/adm/BinAggregator.py b/python/pdstools/adm/BinAggregator.py index 02e86622..14c81a5c 100644 --- a/python/pdstools/adm/BinAggregator.py +++ b/python/pdstools/adm/BinAggregator.py @@ -785,7 +785,7 @@ def plotBinningLift( binning = binning.lazy() # Add Lift column if not present - if "Lift" not in binning.columns: + if "Lift" not in binning.collect_schema().names(): binning = binning.with_columns( (lift(pl.col("BinPositives"), pl.col("BinNegatives")) - 1.0).alias( "Lift" @@ -793,7 +793,7 @@ def plotBinningLift( ) # Optionally a shading expression - if "BinPositives" in binning.columns: + if "BinPositives" in binning.collect_schema().names(): shading_expr = pl.col("BinPositives") <= 5 else: shading_expr = pl.lit(False) @@ -897,7 +897,9 @@ def plot_lift_binning(self, binning: pl.DataFrame) -> Figure: topic = binning.columns[-1] # assuming the roll-up column is the last one model_facet = ( # if there are multiple topics, take the topic otherwise assume not rolled up over a topic - topic if binning.select(pl.col(topic).n_unique() > 1).item() else None + topic + if binning.select(pl.col(topic).n_unique() > 1).item() + else None ) predictor_facet = ( # check if there are multiple predictors diff --git a/python/pdstools/decision_analyzer/decision_data.py b/python/pdstools/decision_analyzer/decision_data.py index b76652cd..fd8f83fe 100644 --- a/python/pdstools/decision_analyzer/decision_data.py +++ b/python/pdstools/decision_analyzer/decision_data.py @@ -208,8 +208,8 @@ def getPreaggregatedFilterView(self): .alias(f"Win_at_rank{i}") for i in range(1, self.max_win_rank + 1) ] + [ - pl.min(stats_cols).suffix("_min"), - pl.max(stats_cols).suffix("_max"), + pl.min(stats_cols).name.suffix("_min"), + pl.max(stats_cols).name.suffix("_max"), pl.col("FinalPropensity", "Priority").sample( n=num_samples, with_replacement=True, shuffle=True ), diff --git a/python/pdstools/decision_analyzer/utils.py b/python/pdstools/decision_analyzer/utils.py index e1a9acbd..2b4637fa 100644 --- a/python/pdstools/decision_analyzer/utils.py +++ b/python/pdstools/decision_analyzer/utils.py @@ -11,7 +11,7 @@ # As long as this is run once, anywhere, it's enabled globally. # Putting it here AND in the Home.py file should therefore be enough, # because every other file imports from utils.py (hence running this part too.) -pl.enable_string_cache(True) +pl.enable_string_cache() # superset, in order diff --git a/python/pdstools/plots/plot_base.py b/python/pdstools/plots/plot_base.py index 8b29187e..642af498 100644 --- a/python/pdstools/plots/plot_base.py +++ b/python/pdstools/plots/plot_base.py @@ -193,19 +193,20 @@ def _subset_data( ) df = getattr(self, table) + df_columns = df.collect_schema().names() if table != "predictorData": required_columns = required_columns.union(self.context_keys) assert required_columns.issubset( - df.columns - ), f"The following columns are missing in the data: {required_columns - set(df.columns)}" + df_columns + ), f"The following columns are missing in the data: {required_columns - set(df_columns)}" df = self._apply_query(df, query) if last: df = self.last(df, "lazy") - if active_only and "PredictorName" in df.columns: + if active_only and "PredictorName" in df.collect_schema().names(): df = df.filter(pl.col("EntryType") == "Active") df, facets = self._generateFacets(df, facets) diff --git a/python/pdstools/plots/plots_plotly.py b/python/pdstools/plots/plots_plotly.py index b2cc3fe3..40754d1f 100644 --- a/python/pdstools/plots/plots_plotly.py +++ b/python/pdstools/plots/plots_plotly.py @@ -491,8 +491,8 @@ def PredictorPerformanceHeatmap(self, df, facet=None, **kwargs): import plotly from packaging import version - assert ( - version.parse(plotly.__version__) >= version.parse("5.5.0") + assert version.parse(plotly.__version__) >= version.parse( + "5.5.0" ), f"Visualisation requires plotly version 5.5.0 or later (you have version {plotly.__version__}): please upgrade to a newer version." title = "over all models" if facet is None else f"per {facet}" @@ -620,9 +620,9 @@ def ModelsByPositives(self, df, by="Channel", **kwargs): labels={"cumModels": "Percentage of Models", "PositivesBin": "Positives"}, template="pega", category_orders={ - "PositivesBin": df.select("PositivesBin", "break_point") + "PositivesBin": df.select("Positives", "PositivesBin") .unique() - .sort("break_point")["PositivesBin"] + .sort("Positives")["PositivesBin"] .to_list() }, ) diff --git a/python/pdstools/utils/cdh_utils.py b/python/pdstools/utils/cdh_utils.py index b1bc2d3f..4841bb76 100644 --- a/python/pdstools/utils/cdh_utils.py +++ b/python/pdstools/utils/cdh_utils.py @@ -92,7 +92,7 @@ def _extract_keys( The dataframe to extract the keys from """ # Checking for the 'column is None/Null' case - if df.schema[col] != pl.Utf8: + if df.collect_schema()[col] != pl.Utf8: return df if import_strategy != "eager": @@ -212,7 +212,7 @@ def getMapping(columns, reverse=False): else: return dict(zip(_capitalize(columns), columns)) - named = getMapping(df.columns) + named = getMapping(df.collect_schema().names()) typed = getMapping( [col for col in dir(definition) if not col.startswith("__")], reverse=True ) @@ -221,7 +221,7 @@ def getMapping(columns, reverse=False): for col, renamedCol in named.items(): try: new_type = getattr(definition, typed[renamedCol]) - original_type = df.schema[col].base_type() + original_type = df.collect_schema()[col].base_type() if original_type == pl.Null: if verbose: warnings.warn(f"Warning: {col} column is Null data type.") @@ -263,7 +263,7 @@ def set_types(df, table="infer", verbose=False, **timestamp_opts): def inferTableDefinition(df): - cols = _capitalize(df.columns) + cols = _capitalize(df.collect_schema().names()) vf = ["Propensity", "Stage"] predictors = ["PredictorName", "ModelID", "BinSymbol"] models = ["ModelID", "Performance"] @@ -326,8 +326,8 @@ def auc_from_probs( if nlabels > 2: raise Exception("'Groundtruth' has more than two levels.") - df = pl.DataFrame({"truth": groundtruth, "probs": probs}) - binned = df.group_by(by="probs").agg( + df = pl.DataFrame({"truth": groundtruth, "probs": probs}, strict=False) + binned = df.group_by(probs="probs").agg( [ (pl.col("truth") == 1).sum().alias("pos"), (pl.col("truth") == 0).sum().alias("neg"), @@ -405,7 +405,7 @@ def aucpr_from_probs( raise Exception("'Groundtruth' has more than two levels.") df = pl.DataFrame({"truth": groundtruth, "probs": probs}) - binned = df.group_by(by="probs").agg( + binned = df.group_by(probs="probs").agg( [ (pl.col("truth") == 1).sum().alias("pos"), (pl.col("truth") == 0).sum().alias("neg"), @@ -543,11 +543,12 @@ def _capitalize(fields: list) -> list: def _polarsCapitalize(df: pl.LazyFrame): + df_cols = df.collect_schema().names() return df.rename( dict( zip( - df.columns, - _capitalize(df.columns), + df_cols, + _capitalize(df_cols), ) ) ) diff --git a/python/pdstools/utils/hds_utils_experimental.py b/python/pdstools/utils/hds_utils_experimental.py new file mode 100644 index 00000000..11468caa --- /dev/null +++ b/python/pdstools/utils/hds_utils_experimental.py @@ -0,0 +1,597 @@ +from pathlib import Path +import polars as pl +import numpy as np +import glob +import os +import re +from typing import Literal, Optional +from itertools import chain +import json +from tqdm.auto import tqdm + + +class Config: + """Configuration file for the data anonymizer. + + Parameters + ---------- + config_file: str = None + An optional path to a config file + hds_folder: Path = "." + The path to the hds files + use_datamart: bool = False + Whether to use the datamart to infer predictor types + datamart_folder: Path = "datamart" + The folder of the datamart files + output_format: Literal["ndjson", "parquet", "arrow", "csv"] = "ndjson" + The output format to write the files in + output_folder: Path = "output" + The location to write the files to + mapping_file: str = "mapping.map" + The name of the predictor mapping file + mask_predictor_names: bool = True + Whether to mask the names of regular predictors + mask_context_key_names: bool = True + Whether to mask the names of context key predictors + mask_ih_names: bool = True + Whether to mask the name of Interaction History summary predictors + mask_outcome_name: bool = True + Whether to mask the name of the outcome column + mask_predictor_values: bool = True + Whether to mask the values of regular predictors + mask_context_key_values: bool = True + Whether to mask the values of context key predictors + mask_ih_values: bool = True + Whether to mask the values of Interaction History summary predictors + mask_outcome_values: bool = True + Whether to mask the values of the outcomes to binary + context_key_label: str = "Context_*" + The pattern of names for context key predictors + ih_label: str = "IH_*" + The pattern of names for Interaction History summary predictors + outcome_column: str = "Decision_Outcome" + The name of the outcome column + positive_outcomes: list = ["Accepted", "Clicked"] + Which positive outcomes to map to True + negative_outcomes: list = ["Rejected", "Impression"] + Which negative outcomes to map to False + special_predictors: list = ["Decision_DecisionTime", "Decision_OutcomeTime"] + A list of special predictors which are not touched + sample_percentage_schema_inferencing: float + The percentage of records to sample to infer the column type. + In case you're getting casting errors, it may be useful to + increase this percentage to check a larger portion of data. + """ + + def __init__( + self, + config_file: Optional[str] = None, + hds_folder: Path = ".", + use_datamart: bool = False, + datamart_folder: Path = "datamart", + output_format: Literal["ndjson", "parquet", "arrow", "csv"] = "ndjson", + output_folder: Path = "output", + mapping_file: str = "mapping.map", + mask_predictor_names: bool = False, + mask_context_key_names: bool = False, + mask_ih_names: bool = True, + mask_outcome_name: bool = False, + mask_predictor_values: bool = True, + mask_context_key_values: bool = False, + mask_ih_values: bool = True, + mask_outcome_values: bool = True, + context_key_label: str = "Context_*", + ih_label: str = "IH_*", + outcome_column: str = "Decision_Outcome", + positive_outcomes: list = ["Accepted", "Clicked"], + negative_outcomes: list = ["Rejected", "Impression"], + special_predictors: list = [ + "Decision_DecisionTime", + "Decision_OutcomeTime", + "Decision_Rank", + ], + sample_percentage_schema_inferencing: float = 0.01, + ): + self._opts = {key: value for key, value in vars().items() if key != "self"} + + if config_file is not None: + self.load_from_config_file(config_file) + for key, value in self._opts.items(): + setattr(self, key, value) + self.validate_paths() + + def load_from_config_file(self, config_file: Path): + """Load the configurations from a file. + + Parameters + ---------- + config_file : Path + The path to the configuration file + """ + if not os.path.exists(config_file): + raise ValueError("Config file does not exist.") + with open(config_file) as f: + self._opts = json.load(f) + + def save_to_config_file(self, file_name: str = None): + """Save the configurations to a file. + + Parameters + ---------- + file_name : str + The name of the configuration file + """ + with open("config.json" if file_name is None else file_name, "w") as f: + json.dump(self._opts, f) + + def validate_paths(self): + """Validate the outcome folder exists.""" + if not os.path.exists(self.output_folder): + os.mkdir(self.output_folder) + + +class DataAnonymization2: + def __init__(self, files, config=None, **config_args): + self.config = Config(**config_args) if config is None else config + + if isinstance(files, str) or isinstance(files, Path): + self.df = self.getFiles(files) + cols = [df.columns for df in self.df.values()] + self.allColumns = set(chain(*cols)) + else: + self.df = files.lazy() + self.allColumns = set(files.columns) + + self.typeMapping = None + self.numericRanges = None + + def inferTypes(self, number_of_files_to_sample=20): + if isinstance(self.df, pl.LazyFrame): + self._inferTypes(self.df) + elif isinstance(self.df, dict): + if number_of_files_to_sample > len(self.df): + number_of_files_to_sample = len(self.df) + + filesToCheck = np.round( + np.linspace(0, len(self.df) - 1, number_of_files_to_sample) + ).astype(int) + + filesToCheck = [list(self.df.values())[i] for i in filesToCheck] + + for column in self.allColumns: + files_with_column = [ + f for f in list(self.df.values()) if column in f.columns + ] + number_of_sampled_files_with_column = len( + [f for f in filesToCheck if column in f.columns] + ) + if number_of_sampled_files_with_column < 5: + if len(files_with_column) > 5: + sampledFiles = np.round( + np.linspace(0, len(files_with_column) - 1, 5) + ).astype(int) + filesToCheck += [ + list(self.df.values())[i] for i in sampledFiles + ] + else: + filesToCheck += files_with_column + + df = pl.concat(filesToCheck, how="diagonal").collect() + self.typeMapping = self._inferTypes(df) + + def _inferTypes(self, df): + types = dict() + for col in df.columns: + try: + ser = df.get_column(col) + try: + ser = ser.replace("", None) + except Exception as e: + pass + ser.cast(pl.Float64) + types[col] = "numeric" + except Exception as e: + types[col] = "symbolic" + + return types + + def setPredictorTypes(self, typeDict: dict): + self.typeMapping = typeDict + + def setTypeForPredictor(self, predictor, type: Literal["numeric", "symbolic"]): + self.typeMapping[predictor] = type + + def getPredictorsWithTypes(self): + return self.typeMapping + + def getFiles(self, orig_files): + """Load the historical dataset files from the `config.hds_folder` location.""" + files = glob.glob(f"{orig_files}/*.json") + files = [f for f in files if os.path.getsize(f) > 0] + + if len(files) < 1: + raise FileNotFoundError( + """Files not found. Please check the `hds_folder` + and check if there are `.json` files in there. + If preferred, you can also supply a polars Dataframe or Lazyframe + directly with the `df` argument.""" + ) + return {f: pl.scan_ndjson(f) for f in files} + + def create_mapping_file(self): + """Create a file to write the column mapping""" + if self.column_mapping is None: + self.create_predictor_mapping() + with open(self.config.mapping_file, "w") as wr: + for key, mapped_val in self.column_mapping.items(): + wr.write(f"{key}={mapped_val}") + wr.write("\n") + + def get_columns_by_type(self): + """Get a list of columns for each type.""" + columns = self.allColumns + columns_ = ",".join(columns) + + exp = "[^,]+" + context_keys_t = re.findall(f"{self.config.context_key_label}{exp}", columns_) + ih_predictors_t = re.findall(f"{self.config.ih_label}{exp}", columns_) + + special_predictors_t = [] + for exc in self.config.special_predictors: + for e in re.findall(exc, columns_): + special_predictors_t.append(e) + + outcome_column_t = [self.config.outcome_column] + predictors_t = list( + set(columns) + - set( + context_keys_t + + ih_predictors_t + + special_predictors_t + + outcome_column_t + ) + ) + return context_keys_t, ih_predictors_t, special_predictors_t, predictors_t + + def create_predictor_mapping(self): + """Map the predictor names to their anonymized form.""" + symbolic_predictors_to_mask, numeric_predictors_to_mask = [], [] + + ( + context_keys_t, + ih_predictors_t, + special_predictors_t, + predictors_t, + ) = self.get_columns_by_type() + + if not hasattr(self, "typeMapping"): + self.inferTypes() + + outcome_t = self.config.outcome_column + + outcome_column = {} + context_keys = {} + ih_predictors = {} + predictors = {} + + for idx, val in enumerate(context_keys_t): + if self.config.mask_context_key_values: + if self.typeMapping[val] == "symbolic": + symbolic_predictors_to_mask.append(val) + else: + numeric_predictors_to_mask.append(val) + context_keys[val] = ( + f"CK_PREDICTOR_{idx}" if self.config.mask_context_key_names else val + ) + + for idx, val in enumerate(ih_predictors_t): + if self.config.mask_ih_values: + if self.typeMapping[val] == "symbolic": + symbolic_predictors_to_mask.append(val) + else: + numeric_predictors_to_mask.append(val) + ih_predictors[val] = ( + f"IH_PREDICTOR_{idx}" if self.config.mask_ih_names else val + ) + + for idx, val in enumerate(predictors_t): + if self.config.mask_predictor_values: + if self.typeMapping[val] == "symbolic": + symbolic_predictors_to_mask.append(val) + else: + numeric_predictors_to_mask.append(val) + predictors[val] = ( + f"PREDICTOR_{idx}" if self.config.mask_predictor_names else val + ) + + special_predictors = {x: x for x in special_predictors_t} + if "Decision_SubjectID" in special_predictors_t: + symbolic_predictors_to_mask.append("Decision_SubjectID") + + outcome_column[outcome_t] = ( + "OUTCOME" if self.config.mask_outcome_name else outcome_t + ) + + self.column_mapping = { + **predictors, + **context_keys, + **ih_predictors, + **special_predictors, + **outcome_column, + } + self.symbolic_predictors_to_mask = symbolic_predictors_to_mask + self.numeric_predictors_to_mask = numeric_predictors_to_mask + + def calculateNumericRanges(self, verbose=False): + def minMaxFrame(df, columns_to_check): + col = pl.col(columns_to_check).replace("", None).cast(pl.Float64) + return df.select( + col.min().name.suffix("_min"), + col.max().name.suffix("_max"), + ).collect() + + if not hasattr(self, "numeric_predictors_to_mask"): + self.create_predictor_mapping() + + if len(self.numeric_predictors_to_mask) == 0: + self.ranges = {} + return None + + if hasattr(self, "ranges"): + if all(self.numeric_predictors_to_mask in set(self.ranges.keys())): + return None + else: + columns_to_check = list( + col + for col in self.numeric_predictors_to_mask + if col not in self.ranges.keys() + ) + else: + columns_to_check = self.numeric_predictors_to_mask + + if isinstance(self.df, pl.LazyFrame): + try: + rangeFrame = minMaxFrame(self.df, columns_to_check) + except Exception as e: + if verbose: + print("Error when building MinMaxFrame: ", e) + else: + rangeList = [] + for file in tqdm( + self.df.values(), desc="Calculating numeric min/max ranges" + ): + try: + rangeList.append(minMaxFrame(file, columns_to_check)) + except Exception as e: + print(f"Exception {e} for file {file}.") + if verbose: + print("Calculated all min/max ranges") + rangeFrame = pl.concat(rangeList, how="diagonal") + if verbose: + print("Combined them into a single frame") + + out = rangeFrame.select( + pl.col("^*min$").cast(pl.Float64).min(), + pl.col("^*max$").cast(pl.Float64).max(), + ).to_dict(as_series=False) + # print("Got global min/max: ", out) + minmax = {} + for col in self.numeric_predictors_to_mask: + try: + minmax[col] = { + "min": float(out[f"{col}_min"][0]), + "max": float(out[f"{col}_max"][0]), + } + except (ValueError, TypeError) as e: + if verbose: + print( + f'Unable to parse column {col}. Min value found is {out[f"{col}_min"]}, max value {out[f"{col}_max"]}. Setting to categorical. Error: {e}' + ) + self.setTypeForPredictor(col, "symbolic") + self.numeric_predictors_to_mask.remove(col) + self.symbolic_predictors_to_mask.append(col) + + self.ranges = minmax + + def setNumericRanges(self, rangeDict: dict): + if not hasattr(self, "ranges"): + self.ranges = {} + for key, value in rangeDict.items(): + self.ranges[key] = value + + def setRangeForPredictor(self, predictorName, min, max): + if not hasattr(self, "ranges"): + self.ranges = {} + self.ranges[predictorName] = {"min": min, "max": max} + + def write_to_output( + self, + df: Optional[pl.DataFrame] = None, + ext: Literal["ndjson", "parquet", "arrow", "csv"] = None, + mode: Literal["optimized", "robust"] = "optimized", + on_failed_file: Literal["warn", "ignore", "fail"] = "fail", + verbose=False, + ): + """Write the processed dataframe to an output file. + + Parameters + ---------- + df : Optional[pl.DataFrame] + Dataframe to write. + If not provided, runs `self.process()` + ext : Literal["ndjson", "parquet", "arrow", "csv"] + What extension to write the file to + mode : Literal['optimized', 'robust'], default = 'optimized' + Whether to output a single file (optimized) or maintain + the same file structure as the original files (robust). + Optimized should be faster, but robust should allow for bigger + data as we don't need all data in memory at the same time. + + """ + out_filename = f"{self.config.output_folder}/hds" + out_format = self.config.output_format if ext is None else ext + + if df is None: + df = self.process(strategy="lazy") + if mode == "robust": + if isinstance(self.df, pl.LazyFrame): + mode = "optimized" + else: + if not hasattr(self, "ranges"): + self.calculateNumericRanges(verbose=verbose) + for filename, df in tqdm(self.df.items(), desc="Anonymizing..."): + newName = Path(out_filename) / Path(filename).name + if not os.path.exists(newName.parent): + os.mkdir(newName.parent) + init_df = self.process(df, verbose=verbose) + if init_df is None: + if on_failed_file == "warn": + print(f"Failed to parse {filename}. Continuing") + if on_failed_file == "fail": + raise Exception( + f'Could not parse {filename}. Set on_failed_file to "warn" to continue' + ) + else: + init_df.write_ndjson(newName) + + if mode == "optimized": + df = df.collect() + if out_format == "ndjson": + df.write_ndjson(f"{out_filename}.json") + elif out_format == "parquet": + df.write_parquet(f"{out_filename}.parquet") + elif out_format == "arrow": + df.write_ipc(f"{out_filename}.arrow") + else: + df.write_csv(f"{out_filename}.csv") + + def getHasher( + self, + cols, + algorithm="xxhash", + seed="random", + seed_1=None, + seed_2=None, + seed_3=None, + ): + if algorithm == "xxhash": + if seed == "random": + if not hasattr(self, "seeds"): + from random import randint + + self.seeds = dict( + seed=randint(0, 1000000000), + seed_1=randint(0, 1000000000), + seed_2=randint(0, 1000000000), + seed_3=randint(0, 1000000000), + ) + + return pl.col(cols).hash(**self.seeds) + + else: + return pl.col(cols).map_elements(algorithm) + + def to_normalize(self, cols, verbose=False): + self.normalizationFailures, exprs = [], [] + for col in cols: + try: + exprs.append( + (pl.col(col) - pl.lit(float(self.ranges[col]["min"]))) + / pl.lit( + float(self.ranges[col]["max"]) - float(self.ranges[col]["min"]) + ) + ) + except Exception as e: + if verbose: + print(f"Error when trying to normalize {col}: {e}") + self.normalizationFailures.append(col) + return exprs + + def process( + self, + df=None, + strategy="eager", + verbose=False, + **kwargs, + ): + """Anonymize the dataset.""" + if self.typeMapping is None: + self.inferTypes() + if not hasattr(self, "column_mapping"): + self.create_predictor_mapping() + + if df is None: + if isinstance(self.df, dict): + df = pl.concat(self.df.values()) + else: + df = self.df + + column_mapping = { + key: val for key, val in self.column_mapping.items() if key in df.columns + } + + if not hasattr(self, "ranges"): + self.calculateNumericRanges(verbose=verbose) + + def to_hash(cols, **kwargs): + hasher = self.getHasher(cols, **kwargs) + return ( + pl.when( + (pl.col(cols).is_not_null()) + & (pl.col(cols).cast(pl.Utf8) != pl.lit("")) + & (pl.col(cols).cast(pl.Utf8).is_in(["true", "false"]).not_()) + ) + .then(hasher) + .otherwise(pl.col(cols)) + ) + + def to_boolean(col, positives): + return ( + pl.when(pl.col(col).cast(pl.Utf8).is_in(positives)) + .then(True) + .otherwise(False) + ).alias(col) + + numerics_in_file = [ + col for col in self.numeric_predictors_to_mask if col in df.columns + ] + symbolics_in_file = [ + col for col in self.symbolic_predictors_to_mask if col in df.columns + ] + + try: + df = df.with_columns( + pl.col(numerics_in_file).replace("", None).cast(pl.Float64) + ).with_columns( + [ + to_hash( + symbolics_in_file, + **kwargs, + ), + *self.to_normalize(numerics_in_file, verbose=verbose), + ] + ) + if self.config.mask_outcome_values: + df = df.with_columns( + to_boolean( + self.config.outcome_column, self.config.positive_outcomes + ) + ) + if len(self.normalizationFailures) > 0 and verbose: + print( + f"{len(self.normalizationFailures)} could not be normalized: {self.normalizationFailures}" + ) + column_mapping = { + key: val + for key, val in column_mapping.items() + if key not in self.normalizationFailures + } + df = df.select(column_mapping.keys()).rename(column_mapping) + + if strategy == "eager": + return df.lazy().collect() + return df + except Exception as e: + if verbose: + print(f"Error when anonymizing: {e}. Available columns: {df.columns}.") + return None diff --git a/python/pdstools/valuefinder/ValueFinder.py b/python/pdstools/valuefinder/ValueFinder.py index 5591c01b..4b0fef35 100644 --- a/python/pdstools/valuefinder/ValueFinder.py +++ b/python/pdstools/valuefinder/ValueFinder.py @@ -519,7 +519,7 @@ def plotPieCharts( .partition_by("pyStage", as_dict=True) ) for i, stage in enumerate(self.NBADStages): - plotdf = df[stage].drop("pyStage") + plotdf = [f for k, f in df.items() if k[0] == stage][0].drop("pyStage") fig.add_trace( go.Pie( values=list(plotdf.to_numpy())[0], diff --git a/python/requirements.txt b/python/requirements.txt index fbdf7ca2..d70504da 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -5,9 +5,9 @@ plotly>=5.5.0 requests pydot tqdm -polars~=0.20.0 +polars>=1.9 pyarrow pyyaml aioboto3>=11.0 great_tables -polars-hash \ No newline at end of file +polars-hash diff --git a/python/tests/test_ADMDatamart.py b/python/tests/test_ADMDatamart.py index 40ca4756..10506f12 100644 --- a/python/tests/test_ADMDatamart.py +++ b/python/tests/test_ADMDatamart.py @@ -7,6 +7,7 @@ import zipfile import polars as pl from polars.testing import assert_frame_equal +from polars.exceptions import ComputeError, InvalidOperationError, ColumnNotFoundError import itertools from pandas.errors import UndefinedVariableError import pathlib @@ -90,7 +91,7 @@ def test_import_utils_with_importing(test): name="Data-Decision-ADM-ModelSnapshot_pyModelSnapshots_20210101T010000_GMT.zip", ) assert isinstance(output, pl.LazyFrame) - assert len(output.columns) == 10 + assert len(output.collect_schema().names()) == 10 assert output.select(pl.len()).collect().item() == 20 assert renamed == { "Channel", @@ -139,7 +140,8 @@ def data(): "2022-03-01 05:00:00", "2022-03-01 05:00:00", ], - } + }, + strict=False, ) @@ -168,7 +170,7 @@ def test_import_no_subset(test, data): subset=False, typesetting_table="ADMModelSnapshot", )[0] - assert "Junk" in output.columns + assert "Junk" in output.collect_schema().names() def test_extract_treatment(test, data): @@ -217,7 +219,7 @@ def test_apply_query(test, data): for frame1, frame2 in itertools.combinations(frames, 2) ] - with pytest.raises(pl.ColumnNotFoundError): + with pytest.raises(ColumnNotFoundError): test._apply_query(data, pl.col("TEST") > 0) with pytest.raises(ValueError): @@ -238,9 +240,10 @@ def test_set_types(test): "2022-03-01 05:00:00", "2022-14-01 05:00:00", ], - } + }, + strict=False, ) - with pytest.raises(pl.ComputeError): + with pytest.raises((ComputeError, InvalidOperationError)): test._set_types( df, timestamp_fmt="%Y%m%dT%H%M%S", @@ -262,8 +265,8 @@ def test_set_types(test): pl.Datetime, ] - assert df2["Positives"].to_list() == [None, None, 3] - assert df2["Negatives"].to_list() == [0.0, None, None] + assert df2["Positives"].to_list() == [1, 2, 3] + assert df2["Negatives"].to_list() == [0, 2, 4] assert df2["Issue"].to_list() == ["Issue1", "Issue2", None] import datetime @@ -444,8 +447,8 @@ def test_create_sign_df(): # TODO: make this a good test rather than test for no fail -def test_model_summary(): - pass +def test_model_summary(test): + test.model_summary() def test_pivot_df(test): diff --git a/python/tests/test_cdh_utils.py b/python/tests/test_cdh_utils.py index 07ba458d..52933522 100644 --- a/python/tests/test_cdh_utils.py +++ b/python/tests/test_cdh_utils.py @@ -185,34 +185,41 @@ def test_weighted_average_polars(): assert output.equals(expected_output) + def test_overlap_lists_polars_simple(): input = pl.DataFrame( - { - "Actions" : [['a', 'b'], ['a', 'c', 'd']], - "Valid" : [True, True] - } + {"Actions": [["a", "b"], ["a", "c", "d"]], "Valid": [True, True]} ) - assert cdh_utils.overlap_lists_polars(input['Actions'], input['Valid']) == [0.5, 1.0/3] + assert cdh_utils.overlap_lists_polars(input["Actions"], input["Valid"]) == [ + 0.5, + 1.0 / 3, + ] + def test_overlap_lists_polars_more(): input = pl.DataFrame( { - "Actions" : [['a', 'b'], ['b'], ['a', 'b'], ['a', 'c', 'd']], - "Valid" : [True, True, False, True], - "Valid2" : [True, True, True, True] + "Actions": [["a", "b"], ["b"], ["a", "b"], ["a", "c", "d"]], + "Valid": [True, True, False, True], + "Valid2": [True, True, True, True], } ) - results = cdh_utils.overlap_lists_polars(input['Actions'], input['Valid']) - expected_results = [0.5, 0.5, float("nan"), 1.0/6] + results = cdh_utils.overlap_lists_polars(input["Actions"], input["Valid"]) + expected_results = [0.5, 0.5, float("nan"), 1.0 / 6] for i in range(len(expected_results)): - assert (results[i] == expected_results[i]) or (np.isnan(results[i]) and np.isnan(expected_results[i])) + assert (results[i] == expected_results[i]) or ( + np.isnan(results[i]) and np.isnan(expected_results[i]) + ) - results = cdh_utils.overlap_lists_polars(input['Actions'], input['Valid2']) - expected_results = [2.0/3, 2.0/3, 2.0/3, 2.0/9] + results = cdh_utils.overlap_lists_polars(input["Actions"], input["Valid2"]) + expected_results = [2.0 / 3, 2.0 / 3, 2.0 / 3, 2.0 / 9] for i in range(len(expected_results)): - assert (results[i] == expected_results[i]) or (np.isnan(results[i]) and np.isnan(expected_results[i])) + assert (results[i] == expected_results[i]) or ( + np.isnan(results[i]) and np.isnan(expected_results[i]) + ) + def test_weighted_performance_polars(): input = pl.DataFrame( @@ -270,13 +277,15 @@ def test_lift(): { "BinPositives": [0, 7, 11, 12, 6], "BinNegatives": [5, 2208, 1919, 1082, 352], - } + }, ) output = input.with_columns(cdh_utils.lift()).with_columns(pl.col("Lift").round(7)) vals = [0, 0.4917733, 0.8869027, 1.7068860, 2.6080074] - expected_output = input.with_columns(pl.Series(name="Lift", values=vals)) + expected_output = input.with_columns( + pl.Series(name="Lift", values=vals, strict=False) + ) assert output.equals(expected_output) @@ -538,4 +547,4 @@ def test_legend_color_order(): input_fig = datasets.CDHSample().plotPerformanceSuccessRateBubbleChart() output_fig = cdh_utils.legend_color_order(input_fig) - assert output_fig.data[0].marker.color == "#001F5F" \ No newline at end of file + assert output_fig.data[0].marker.color == "#001F5F" diff --git a/python/tests/test_end_to_end.py b/python/tests/test_end_to_end.py index 0c0f80da..4eefb2b8 100644 --- a/python/tests/test_end_to_end.py +++ b/python/tests/test_end_to_end.py @@ -21,7 +21,7 @@ class Shape: """ def __new__(cls, ldf: pl.LazyFrame): - return (ldf.select(pl.first().len()).collect().item(), len(ldf.columns)) + return (ldf.select(pl.first().len()).collect().item(), len(ldf.collect_schema().names())) def test_end_to_end():