From 27f99fbc577170add5a3ce6606cb24742fdada1b Mon Sep 17 00:00:00 2001 From: FBurkhardt Date: Tue, 7 Nov 2023 15:02:35 +0100 Subject: [PATCH] 0.68.0 --- CHANGELOG.md | 6 +- nkululeko/constants.py | 4 +- nkululeko/data/dataset.py | 216 ++++---- nkululeko/split/__init__.py | 3 + nkululeko/split/example_binning.py | 27 + nkululeko/split/example_trainDevTestSplit.py | 81 +++ nkululeko/split/example_trainTestSplit.py | 77 +++ nkululeko/split/split_utils.py | 528 +++++++++++++++++++ 8 files changed, 849 insertions(+), 93 deletions(-) create mode 100644 nkululeko/split/__init__.py create mode 100644 nkululeko/split/example_binning.py create mode 100644 nkululeko/split/example_trainDevTestSplit.py create mode 100644 nkululeko/split/example_trainTestSplit.py create mode 100644 nkululeko/split/split_utils.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 32547841..a8c2e671 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,12 @@ Changelog ========= +Version 0.68.0 +-------------- +* added stratification framework for split balancing + Version 0.67.0 ---------------- +-------------- * added first version of spotlight integration Version 0.66.13 diff --git a/nkululeko/constants.py b/nkululeko/constants.py index 7570ce73..4fadb46b 100644 --- a/nkululeko/constants.py +++ b/nkululeko/constants.py @@ -1,2 +1,2 @@ -VERSION="0.67.0" -SAMPLING_RATE = 16000 \ No newline at end of file +VERSION="0.68.0" +SAMPLING_RATE = 16000 diff --git a/nkululeko/data/dataset.py b/nkululeko/data/dataset.py index e7d71f77..d2558941 100644 --- a/nkululeko/data/dataset.py +++ b/nkululeko/data/dataset.py @@ -7,6 +7,7 @@ import audformat import nkululeko.filter_data as filter import nkululeko.glob_conf as glob_conf + # import audb import pandas as pd from audformat.utils import duration @@ -14,6 +15,7 @@ from nkululeko.plots import Plots from nkululeko.reporting.report_item import ReportItem from nkululeko.util import Util +import nkululeko.split.split_utils as split class Dataset: @@ -33,9 +35,7 @@ def __init__(self, name): self.util = Util("dataset") self.plot = Plots() self.limit = int(self.util.config_val_data(self.name, "limit", 0)) - self.start_fresh = eval( - self.util.config_val("DATA", "no_reuse", "False") - ) + self.start_fresh = eval(self.util.config_val("DATA", "no_reuse", "False")) self.is_labeled, self.got_speaker, self.got_gender, self.got_age = ( False, False, @@ -93,9 +93,7 @@ def load(self): store_file = f"{store}{self.name}.pkl" self.root = self._load_db() if not self.start_fresh and os.path.isfile(store_file): - self.util.debug( - f"{self.name}: reusing previously stored file {store_file}" - ) + self.util.debug(f"{self.name}: reusing previously stored file {store_file}") self.df = pd.read_pickle(store_file) self.is_labeled = self.target in self.df self.got_gender = "gender" in self.df @@ -110,14 +108,10 @@ def load(self): # map the audio file paths self.db.map_files(lambda x: os.path.join(self.root, x)) # the dataframes (potentially more than one) with at least the file names - df_files = self.util.config_val_data( - self.name, "files_tables", "['files']" - ) + df_files = self.util.config_val_data(self.name, "files_tables", "['files']") df_files_tables = ast.literal_eval(df_files) # The label for the target column - self.col_label = self.util.config_val_data( - self.name, "label", self.target - ) + self.col_label = self.util.config_val_data(self.name, "label", self.target) ( df, self.is_labeled, @@ -255,9 +249,7 @@ def _get_df_for_lists(self, db, df_files): pass try: # same for the target, e.g. "age" - df_local[self.target] = db[table]["speaker"].get( - map=self.target - ) + df_local[self.target] = db[table]["speaker"].get(map=self.target) is_labeled = True except (ValueError, audformat.core.errors.BadKeyError) as e: pass @@ -281,29 +273,25 @@ def split(self): if os.path.isfile(storage_train) and os.path.isfile(storage_test): # if self.util.config_val_data(self.name, 'test_tables', False): self.util.debug( - "splits: reusing previously stored test file" - f" {storage_test}" + "splits: reusing previously stored test file" f" {storage_test}" ) self.df_test = pd.read_pickle(storage_test) self.util.debug( - "splits: reusing previously stored train file" - f" {storage_train}" + "splits: reusing previously stored train file" f" {storage_train}" ) self.df_train = pd.read_pickle(storage_train) return elif os.path.isfile(storage_train): self.util.debug( - "splits: reusing previously stored train file" - f" {storage_train}" + "splits: reusing previously stored train file" f" {storage_train}" ) self.df_train = pd.read_pickle(storage_train) self.df_test = pd.DataFrame() return elif os.path.isfile(storage_test): self.util.debug( - "splits: reusing previously stored test file" - f" {storage_test}" + "splits: reusing previously stored test file" f" {storage_test}" ) self.df_test = pd.read_pickle(storage_test) self.df_train = pd.DataFrame() @@ -314,9 +302,7 @@ def split(self): traindf = self.db.tables[self.target + ".train"].df # use only the train and test samples that were not perhaps filtered out by an earlier processing step self.df_test = self.df.loc[self.df.index.intersection(testdf.index)] - self.df_train = self.df.loc[ - self.df.index.intersection(traindf.index) - ] + self.df_train = self.df.loc[self.df.index.intersection(traindf.index)] elif split_strategy == "train": self.df_train = self.df self.df_test = pd.DataFrame() @@ -339,26 +325,18 @@ def split(self): if entry_train_tables: train_tables = ast.literal_eval(entry_train_tables) for train_table in train_tables: - traindf = pd.concat( - [traindf, self.db.tables[train_table].df] - ) + traindf = pd.concat([traindf, self.db.tables[train_table].df]) # use only the train and test samples that were not perhaps filtered out by an earlier processing step # testdf.index.map(lambda x: os.path.join(self.root, x)) # testdf.index = testdf.index.to_series().apply(lambda x: self.root+x) testdf = testdf.set_index( - audformat.utils.to_segmented_index( - testdf.index, allow_nat=False - ) + audformat.utils.to_segmented_index(testdf.index, allow_nat=False) ) traindf = traindf.set_index( - audformat.utils.to_segmented_index( - traindf.index, allow_nat=False - ) + audformat.utils.to_segmented_index(traindf.index, allow_nat=False) ) self.df_test = self.df.loc[self.df.index.intersection(testdf.index)] - self.df_train = self.df.loc[ - self.df.index.intersection(traindf.index) - ] + self.df_train = self.df.loc[self.df.index.intersection(traindf.index)] # it might be necessary to copy the target values try: self.df_test[self.target] = testdf[self.target] @@ -368,6 +346,8 @@ def split(self): self.df_train[self.target] = traindf[self.target] except KeyError: pass # if the dataframe is empty + elif split_strategy == "balanced": + self.balanced_split() elif split_strategy == "speaker_split": self.split_speakers() elif split_strategy == "random": @@ -398,11 +378,73 @@ def finish_up(self, df, storage): df.to_pickle(storage) return df + def balanced_split(self): + """One way to split train and eval sets: Generate split dataframes for some balancing criterion""" + seed = 42 + k = 30 + test_size = int(self.util.config_val_data(self.name, "test_size", 20)) / 100.0 + df = self.df + # split target + targets = df[self.target].to_numpy() + # + bins = self.util.config_val("DATA", f"bin", False) + if bins: + nbins = len(ast.literal_eval(bins)) + targets = split.binning(targets, nbins=nbins) + # on which variable to split + speakers = df["speaker"].to_numpy() + + # on which variables (targets, groupings) to stratify + stratif_vars = self.util.config_val("DATA", f"balance", False) + stratif_vars_array = {} + if not stratif_vars: + self.util.error("balanced split needs stratif_vars to stratify the splits") + else: + stratif_vars = ast.literal_eval(stratif_vars) + for stratif_var in stratif_vars.keys(): + if stratif_var == self.target: + stratif_vars_array[self.target] = targets + continue + else: + data = df[stratif_var].to_numpy() + bins = self.util.config_val("DATA", f"{stratif_var}_bins", False) + if bins: + data = split.binning(data, nbins=int(bins)) + stratif_vars_array[stratif_var] = data + # weights for all stratify_on variables and + # and for test proportion match. Give target + # variable EMOTION more weight than groupings. + size_diff = int(self.util.config_val("DATA", f"size_diff_weight", "1")) + weights = { + "size_diff": size_diff, + } + for key, value in stratif_vars.items(): + weights[key] = value + # find optimal test indices TEST_I in DF + # info: dict with goodness of split information + train_i, test_i, info = split.optimize_traintest_split( + X=df, + y=targets, + split_on=speakers, + stratify_on=stratif_vars_array, + weight=weights, + test_size=test_size, + k=k, + seed=seed, + ) + self.util.debug(f"stratification info;\n{info}") + self.df_train = df.iloc[train_i] + self.df_test = df.iloc[test_i] + self.util.debug( + f"{self.name} (balanced split): [{self.df_train.shape[0]}/{self.df_test.shape[0]}]" + " samples in train/test" + ) + # because this generates new train/test sample quantaties, the feature extraction has to be done again + glob_conf.config["FEATS"]["needs_feature_extraction"] = "True" + def split_speakers(self): """One way to split train and eval sets: Specify percentage of evaluation speakers""" - test_percent = int( - self.util.config_val_data(self.name, "test_size", 20) - ) + test_percent = int(self.util.config_val_data(self.name, "test_size", 20)) df = self.df s_num = df.speaker.nunique() test_num = int(s_num * (test_percent / 100)) @@ -410,7 +452,7 @@ def split_speakers(self): self.df_test = df[df.speaker.isin(test_spkrs)] self.df_train = df[~df.index.isin(self.df_test.index)] self.util.debug( - f"{self.name}: [{self.df_train.shape[0]}/{self.df_test.shape[0]}]" + f"{self.name} (speaker split): [{self.df_train.shape[0]}/{self.df_test.shape[0]}]" " samples in train/test" ) # because this generates new train/test sample quantaties, the feature extraction has to be done again @@ -418,9 +460,7 @@ def split_speakers(self): def random_split(self): """One way to split train and eval sets: Specify percentage of random samples""" - test_percent = int( - self.util.config_val_data(self.name, "test_size", 20) - ) + test_percent = int(self.util.config_val_data(self.name, "test_size", 20)) df = self.df s_num = len(df) test_num = int(s_num * (test_percent / 100)) @@ -442,49 +482,47 @@ def _add_labels(self, df): return df def prepare_labels(self): - strategy = self.util.config_val("DATA", "strategy", "train_test") + # strategy = self.util.config_val("DATA", "strategy", "train_test") only_tests = eval(self.util.config_val("DATA", "tests", "False")) - if strategy == "cross_data" or only_tests: - self.df = self.map_labels(self.df) - # Bin target values if they are continuous but a classification experiment should be done - self.map_continuous_classification(self.df) - self.df = self._add_labels(self.df) - if self.util.config_val_data(self.name, "value_counts", False): - if not self.got_gender or not self.got_speaker: - self.util.error( - "can't plot value counts if no speaker or gender is" - " given" - ) - else: - self.plot.describe_df( - self.name, self.df, self.target, f"{self.name}_distplot" - ) - elif strategy == "train_test": - self.df_train = self.map_labels(self.df_train) - self.df_test = self.map_labels(self.df_test) - self.map_continuous_classification(self.df_train) - self.map_continuous_classification(self.df_test) - self.df_train = self._add_labels(self.df_train) - self.df_test = self._add_labels(self.df_test) - if self.util.config_val_data(self.name, "value_counts", False): - if not self.got_gender or not self.got_speaker: - self.util.error( - "can't plot value counts if no speaker or gender is" - " given" - ) - else: - self.plot.describe_df( - self.name, - self.df_train, - self.target, - f"{self.name}_train_distplot", - ) - self.plot.describe_df( - self.name, - self.df_test, - self.target, - f"{self.name}_test_distplot", - ) + # if strategy == "cross_data" or only_tests: + # self.df = self.map_labels(self.df) + # # Bin target values if they are continuous but a classification experiment should be done + # self.map_continuous_classification(self.df) + # self.df = self._add_labels(self.df) + # if self.util.config_val_data(self.name, "value_counts", False): + # if not self.got_gender or not self.got_speaker: + # self.util.error( + # "can't plot value counts if no speaker or gender is" " given" + # ) + # else: + # self.plot.describe_df( + # self.name, self.df, self.target, f"{self.name}_distplot" + # ) + # elif strategy == "train_test": + self.df_train = self.map_labels(self.df_train) + self.df_test = self.map_labels(self.df_test) + self.map_continuous_classification(self.df_train) + self.map_continuous_classification(self.df_test) + self.df_train = self._add_labels(self.df_train) + self.df_test = self._add_labels(self.df_test) + if self.util.config_val_data(self.name, "value_counts", False): + if not self.got_gender or not self.got_speaker: + self.util.error( + "can't plot value counts if no speaker or gender is" " given" + ) + else: + self.plot.describe_df( + self.name, + self.df_train, + self.target, + f"{self.name}_train_distplot", + ) + self.plot.describe_df( + self.name, + self.df_test, + self.target, + f"{self.name}_test_distplot", + ) def map_labels(self, df): pd.options.mode.chained_assignment = None @@ -521,9 +559,7 @@ def check_continuous_classification(self): def map_continuous_classification(self, df): """Map labels to bins for continuous data that should be classified""" if self.check_continuous_classification(): - self.util.debug( - f"{self.name}: binning continuous variable to categories" - ) + self.util.debug(f"{self.name}: binning continuous variable to categories") cat_vals = self.util.continuous_to_categorical(df[self.target]) df[self.target] = cat_vals labels = ast.literal_eval(glob_conf.config["DATA"]["labels"]) diff --git a/nkululeko/split/__init__.py b/nkululeko/split/__init__.py new file mode 100644 index 00000000..45629229 --- /dev/null +++ b/nkululeko/split/__init__.py @@ -0,0 +1,3 @@ +from nkululeko.constants import VERSION + +__version__ = VERSION diff --git a/nkululeko/split/example_binning.py b/nkululeko/split/example_binning.py new file mode 100644 index 00000000..31cca642 --- /dev/null +++ b/nkululeko/split/example_binning.py @@ -0,0 +1,27 @@ +""" +Code copyright by Uwe Reichel +""" + +import numpy as np +from split_utils import binning, optimize_traindevtest_split + +np.random.seed(42) +y = np.random.rand(10) + +# intrinsic binning by equidistant percentiles +yci = binning(y, nbins=3) + +# extrinsic binning by explicit lower boundaries +yce = binning(y, lower_boundaries=[0, 0.3, 0.8]) + +print("yci:", yci) +print("yce:", yce) + +""" + yci: [0 2 2 1 0 0 0 2 1 2] + yce: [1 2 1 1 0 0 0 2 1 1] + + now yci or yce can be used for stratification, e.g. +stratify_on = {"target": yci, ...} +... = optimize_traindevtest_split(..., y=y, stratify_on=stratify_on, ...) +""" diff --git a/nkululeko/split/example_trainDevTestSplit.py b/nkululeko/split/example_trainDevTestSplit.py new file mode 100644 index 00000000..a6b6413b --- /dev/null +++ b/nkululeko/split/example_trainDevTestSplit.py @@ -0,0 +1,81 @@ +""" +Code copyright by Uwe Reichel +""" + +# import json +import pandas as pd +import audb +from split_utils import optimize_traindevtest_split + +# define train/dev/testset split on emodb, that is: +# - speaker disjunct +# - optimally stratified on emotion +# - optimally stratified on gender +# - optimally stratified on transcriptions +# - that contains 10% of the speakers in both dev and test set +# - and approximately 10% of the files in both dev and test set + + +# data +db = audb.load( + "emodb", version="1.3.0", format="wav", sampling_rate=16000, mixdown=True +) +df_emotion = db["emotion"].get() +df_files = db["files"].get() +df_speaker = db["speaker"].get() + +df = pd.concat([df_emotion, df_files], axis=1, join="inner") + + +def spk2gender(x): + if x in [8, 9, 13, 14, 16]: + return "female" + return "male" + + +df["gender"] = df["speaker"].map(spk2gender) + +# seed, dev and test proportion, number of different splits +seed = 42 +dev_size = 0.2 +test_size = 0.2 +k = 30 + +# targets +emotion = df["emotion"].to_numpy() + +# on which variable to split +speaker = df["speaker"].to_numpy() + +# on which variables (targets, groupings) to stratify +stratif_vars = { + "emotion": emotion, + "gender": df["gender"].to_numpy(), + "transcription": df["transcription"].to_numpy(), +} + +# weights for all stratify_on variables and +# and for dev and test proportion match. Give target +# variable EMOTION more weight than groupings. +weight = {"emotion": 2, "gender": 1, "transcription": 1, "size_diff": 1} + +# find optimal dev and test indices DEV_I and TEST_I in DF +# info: dict with goodness of split information +train_i, dev_i, test_i, info = optimize_traindevtest_split( + X=df, + y=emotion, + split_on=speaker, + stratify_on=stratif_vars, + weight=weight, + dev_size=dev_size, + test_size=test_size, + k=k, + seed=seed, +) + +print("dev split of DF:") +print(df.iloc[dev_i]) +print("dev split of target variable:") +print(emotion[dev_i]) +print("goodness of split:") +print(info) diff --git a/nkululeko/split/example_trainTestSplit.py b/nkululeko/split/example_trainTestSplit.py new file mode 100644 index 00000000..10c68e64 --- /dev/null +++ b/nkululeko/split/example_trainTestSplit.py @@ -0,0 +1,77 @@ +""" +Code copyright by Uwe Reichel +""" + +import pandas as pd +import audb +from split_utils import optimize_traintest_split + +# define testset on emodb, that is: +# - speaker disjunct +# - optimally stratified on emotion +# - optimally stratified on gender +# - optimally stratified on transcriptions +# - that contains 10% of the speakers +# - and approximately 10% of the files + +# data +db = audb.load( + "emodb", version="1.3.0", format="wav", sampling_rate=16000, mixdown=True +) +df_emotion = db["emotion"].get() +df_files = db["files"].get() +df_speaker = db["speaker"].get() +df = pd.concat([df_emotion, df_files], axis=1, join="inner") + + +def spk2gender(x): + if x in [8, 9, 13, 14, 16]: + return "female" + return "male" + + +df["gender"] = df["speaker"].map(spk2gender) + +# seed, test proportion, number of different splits +seed = 42 +test_size = 0.2 +k = 30 + +# targets +emotion = df["emotion"].to_numpy() + +# on which variable to split + +speaker = df["speaker"].to_numpy() + +# on which variables (targets, groupings) to stratify +stratif_vars = { + "emotion": emotion, + "gender": df["gender"].to_numpy(), + "transcription": df["transcription"].to_numpy(), +} + +# weights for all stratify_on variables and +# and for test proportion match. Give target +# variable EMOTION more weight than groupings. +weight = {"emotion": 2, "gender": 1, "transcription": 1, "size_diff": 1} + +# find optimal test indices TEST_I in DF +# info: dict with goodness of split information +train_i, test_i, info = optimize_traintest_split( + X=df, + y=emotion, + split_on=speaker, + stratify_on=stratif_vars, + weight=weight, + test_size=test_size, + k=k, + seed=seed, +) + +print("test split of DF:") +print(df.iloc[test_i]) +print("test split of target variable:") +print(emotion[test_i]) +print("goodness of split:") +print(info) diff --git a/nkululeko/split/split_utils.py b/nkululeko/split/split_utils.py new file mode 100644 index 00000000..9609e90b --- /dev/null +++ b/nkululeko/split/split_utils.py @@ -0,0 +1,528 @@ +""" +Code copyright by Uwe Reichel +""" + +from collections import Counter +import numpy as np +import pandas as pd +import scipy.spatial as ssp +from sklearn.model_selection import GroupShuffleSplit +import sys + + +def optimize_traindevtest_split( + X, y, split_on, stratify_on, weight=None, dev_size=0.1, test_size=0.1, k=30, seed=42 +): + """optimize group-disjunct split into training, dev, and test set, which is guided by: + - disjunct split of values in SPLIT_ON + - stratification by all keys in STRATIFY_ON (targets and groupings) + - test set proportion in X should be close to test_size (which is the test + proportion in set(split_on)) + + Score to be minimized: (sum_v[w(v) * max_irad(v)] + w(d) * max_d) / (sum_v[w(v)] + w(d)) + (v: variables to be stratified on + w(v): their weight + max_irad(v): maximum information radius of reference distribution of classes in v and + - dev set distribution, + - test set distribution + N(v): number of stratification variables + max_d: maximum of absolute difference between dev and test sizes of X and set(split_on) + w(d): its weight + + Args: + X: (pd.DataFrame) of features/groupings for which best split + is to be calculated. Of shape (N, M) + y: (np.array) of targets of length N + if type(y[0]) in ["str", "int"]: y is assumed to be categorical, so that it is additionally + tested that all partitions cover all classes. Else y is assumed to be numeric and no + coverage test is done. + split_on: (np.array) list of length N with grouping variable (e.g. speaker IDs), + on which the group-disjunct split is to be performed. Must be categorical. + stratify_on: (dict) Dict-keys are variable names (targets and/or further groupings) + the split should be stratified on (groupings could e.g. be sex, age class, etc). + Dict-Values are np.array-s of length N that contain the variable values. All + variables must be categorical. + weight: (dict) weight for each variable in stratify_on. Defines their amount of + contribution to the optimization score. Uniform weighting by default. Additional + key: "size_diff" defines how the corresponding size differences should be weighted. + dev_size: (float) proportion in set(split_on) for dev set, e.g. 10% of speakers + to be held-out + test_size: (float) test proportion in set(split_on) for test set + k: (int) number of different splits to be tried out + seed: (int) random seed + Returns: + train_i: (np.array) train set indices in X + dev_i: (np.array) dev set indices in X + test_i: (np.array) test set indices in X + info: (dict) detail information about reference and achieved prob distributions + "dev_size_in_spliton": intended grouping dev_size + "dev_size_in_X": optimized dev proportion of observations in X + "test_size_in_spliton": intended grouping test_size + "test_size_in_X": optimized test proportion of observations in X + "p_ref_{c}": reference class distribution calculated from stratify_on[c] + "p_dev_{c}": dev set class distribution calculated from stratify_on[c][dev_i] + "p_test_{c}": test set class distribution calculated from stratify_on[c][test_i] + """ + + # data size + N = len(y) + + # categorical target: number of classes for coverage test + if is_categorical(y[0]): + nc = len(set(y)) + else: + nc = None + + # adjusted dev_size after having split off the test set + dev_size_adj = (dev_size * N) / (N - test_size * N) + + # split all into train/dev vs test + gss_o = GroupShuffleSplit(n_splits=k, test_size=test_size, random_state=seed) + + # split train/dev into train vs dev + gss_i = GroupShuffleSplit(n_splits=k, test_size=dev_size_adj, random_state=seed) + + # set weight defaults + if weight is None: + weight = {} + for c in stratify_on.keys(): + if c not in weight: + weight[c] = 1 + if "size_diff" not in weight: + weight["size_diff"] = 1 + + # stratification reference distributions calculated on stratify_on + p_ref = {} + for c in stratify_on: + p_ref[c] = class_prob(stratify_on[c]) + + # best train/dev/test indices in X; best associated score + train_i, dev_i, test_i, best_sco = None, None, None, np.inf + + # full target coverage in all partitions + full_target_coverage = False + + # brute-force optimization of SPLIT_ON split + # outer loop *_o: splitting into train/dev and test + # inner loop *_i: spltting into train and dev + for tri_o, tei_o in gss_o.split(X, y, split_on): + # current train/dev partition + X_i = X.iloc[tri_o] + y_i = y[tri_o] + split_on_i = split_on[tri_o] + + for tri_i, tei_i in gss_i.split(X_i, y_i, split_on_i): + # all classes maintained in all partitions? + if nc: + nc_train = len(set(y[tri_o[tri_i]])) + nc_dev = len(set(y[tri_o[tei_i]])) + nc_test = len(set(y[tei_o])) + if min(nc_train, nc_dev, nc_test) < nc: + continue + + full_target_coverage = True + + sco = calc_split_score( + test_i=tei_o, + stratify_on=stratify_on, + weight=weight, + p_ref=p_ref, + N=N, + test_size=test_size, + dev_i=tri_o[tei_i], + dev_size=dev_size_adj, + ) + + if sco < best_sco: + best_sco = sco + test_i = tei_o + train_i = tri_o[tri_i] + dev_i = tri_o[tei_i] + + if test_i is None: + sys.exit(exit_message(full_target_coverage, "dev and test")) + + # matching info + info = { + "score": best_sco, + "size_devset_in_spliton": dev_size, + "size_devset_in_X": np.round(len(dev_i) / N, 2), + "size_testset_in_spliton": test_size, + "size_testset_in_X": np.round(len(test_i) / N, 2), + } + + for c in p_ref: + info[f"p_{c}_ref"] = p_ref[c] + info[f"p_{c}_dev"] = class_prob(stratify_on[c][dev_i]) + info[f"p_{c}_test"] = class_prob(stratify_on[c][test_i]) + + return train_i, dev_i, test_i, info + + +def optimize_traintest_split( + X, y, split_on, stratify_on, weight=None, test_size=0.1, k=30, seed=42 +): + """optimize group-disjunct split which is guided by: + - disjunct split of values in SPLIT_ON + - stratification by all keys in STRATIFY_ON (targets and groupings) + - test set proportion in X should be close to test_size (which is the test + proportion in set(split_on)) + + Score to be minimized: (sum_v[w(v) * irad(v)] + w(d) * d) / (sum_v[w(v)] + w(d)) + (v: variables to be stratified on + w(v): their weight + irad(v): information radius between reference distribution of classes in v + and test set distribution + N(v): number of stratification variables + d: absolute difference between test sizes of X and set(split_on) + w(d): its weight + + Args: + X: (pd.DataFrame) of features/groupings for which best split + is to be calculated. Of shape (N, M) + y: (np.array) of targets of length N + if type(y[0]) in ["str", "int"]: y is assumed to be categorical, so that it is additionally + tested that all partitions cover all classes. Else y is assumed to be numeric and no + coverage test is done. + split_on: (np.array) list of length N with grouping variable (e.g. speaker IDs), + on which the group-disjunct split is to be performed. Must be categorical. + stratify_on: (dict) Dict-keys are variable names (targets and/or further groupings) + the split should be stratified on (groupings could e.g. be sex, age class, etc). + Dict-Values are np.array-s of length N that contain the variable values. All + variables must be categorical. + weight: (dict) weight for each variable in stratify_on. Defines their amount of + contribution to the optimization score. Uniform weighting by default. Additional + key: "size_diff" defines how test size diff should be weighted. + test_size: (float) test proportion in set(split_on), e.g. 10% of speakers to be held-out + k: (int) number of different splits to be tried out + seed: (int) random seed + Returns: + train_i: (np.array) train set indices in X + test_i: (np.array) test set indices in X + info: (dict) detail information about reference and achieved prob distributions + "size_testset_in_spliton": intended test_size + "size_testset_in_X": optimized test proportion in X + "p_ref_{c}": reference class distribution calculated from stratify_on[c] + "p_test_{c}": test set class distribution calculated from stratify_on[c][test_i] + """ + + gss = GroupShuffleSplit(n_splits=k, test_size=test_size, random_state=seed) + + # set weight defaults + if weight is None: + weight = {} + for c in stratify_on.keys(): + if c not in weight: + weight[c] = 1 + if "size_diff" not in weight: + weight["size_diff"] = 1 + + # stratification reference distributions calculated on stratify_on + p_ref = {} + for c in stratify_on: + p_ref[c] = class_prob(stratify_on[c]) + + # best train and test indices in X; best associated score + train_i, test_i, best_sco = None, None, np.inf + + # data size + N = len(y) + + # full target coverage in all partitions + full_target_coverage = False + + # categorical target: number of classes for coverage test + if is_categorical(y[0]): + nc = len(set(y)) + else: + nc = None + + # brute-force optimization of SPLIT_ON split + for tri, tei in gss.split(X, y, split_on): + # all classes maintained in all partitions? + if nc: + nc_train = len(set(y[tri])) + nc_test = len(set(y[tei])) + if min(nc_train, nc_test) < nc: + continue + + full_target_coverage = True + + sco = calc_split_score(tei, stratify_on, weight, p_ref, N, test_size) + if sco < best_sco: + train_i, test_i, best_sco = tri, tei, sco + + if test_i is None: + sys.exit(exit_message(full_target_coverage)) + + # matching info + info = { + "score": best_sco, + "size_testset_in_spliton": test_size, + "size_testset_in_X": np.round(len(test_i) / N, 2), + } + + for c in p_ref: + info[f"p_{c}_ref"] = p_ref[c] + info[f"p_{c}_test"] = class_prob(stratify_on[c][test_i]) + + return train_i, test_i, info + + +def calc_split_score( + test_i, stratify_on, weight, p_ref, N, test_size, dev_i=None, dev_size=None +): + """calculate split score based on class distribution IRADs and + differences in partition sizes of groups vs observations; smaller is better. + If dev_i and dev_size are not provided, the score is calculated for the train/test + split only. If they are provided the score is calculated for the train/dev/test split + Args: + test_i: (np.array) of test set indices + stratify_on: (dict) Dict-keys are variable names (targets and/or further groupings) + the split should be stratified on (groupings could e.g. be sex, age class, etc). + Dict-Values are np.array-s of length N that contain the variable values. + weight: (dict) weight for each variable in stratify_on. Additional + key: "size_diff" that weights the grouping vs observation level test set size difference + p_ref: (dict) reference class distributions for all variables in stratify_on + N: (int) size of underlying data set + test_size: (float) test proportion in value set of variable, the disjunct grouping + has been carried out + dev_i: (np.array) of dev test indices + dev_size: (float) dev proportion in value set of variable, the disjunct grouping + has been carried out (this value should have been adjusted after splitting off the + test set) + """ + + if dev_i is None: + do_dev = False + else: + do_dev = True + + # dev and test set class distributions + p_test, p_dev = {}, {} + for c in p_ref: + p_test[c] = class_prob(stratify_on[c][test_i]) + if do_dev: + p_dev[c] = class_prob(stratify_on[c][dev_i]) + + # score + sco, wgt = 0, 0 + + # IRADs (if p_test[c] or p_dec[c] do not contain + # all classes in p_ref[c], return INF) + for c in p_ref: + irad, full_coverage = calc_irad(p_ref[c], p_test[c]) + if not full_coverage: + return np.inf + if do_dev: + irad_dev, full_coverage = calc_irad(p_ref[c], p_dev[c]) + if not full_coverage: + return np.inf + irad = max(irad, irad_dev) + + sco += weight[c] * irad + wgt += weight[c] + + # partition size difference groups vs observations + size_diff = np.abs(len(test_i) / N - test_size) + if do_dev: + size_diff_dev = np.abs(len(dev_i) / N - dev_size) + size_diff = max(size_diff, size_diff_dev) + + sco += weight["size_diff"] * size_diff + wgt += weight["size_diff"] + + sco /= wgt + + return sco + + +def calc_irad(p1, p2): + """calculate information radius of prob dicts p1 and p2 + Args: + p1, p2: (dict) of probabilities + Returns: + ir: (float) information radius + full_coverage: (bool) True if all elements in p1 occur in p2 + and vice versa + """ + + p, q = [], [] + full_coverage = True + + for u in sorted(p1.keys()): + if u not in p2: + full_coverage = False + a = 0.0 + else: + a = p2[u] + + p.append(p1[u]) + q.append(a) + + if full_coverage: + if len(p2.keys()) > len(p1.keys()): + full_coverage = False + + irad = ssp.distance.jensenshannon(p, q) + + return irad, full_coverage + + +def class_prob(y): + """returns class probabilities in y + Args: + y (array-like) of classes + Returns: + p (dict) assigning to each class in Y its maximum likelihood + """ + + p = {} + N = len(y) + c = Counter(y) + for x in c: + p[x] = c[x] / N + + return p + + +def is_categorical(x): + """returns True if type of x is in str or int*, + else False""" + + if type(x) in [ + str, + int, + np.int16, + np.int32, + np.int64, + np.uint8, + np.uint16, + np.uint32, + ]: + return True + return False + + +def dummy_variable(X, columns, specs=None, squeeze_classes=False): + """ + creates dummy variable from binned numeric columns that can be used + later for stratification etc. + + Args: + X: (pd.DataFrame) + columns: (str or list) of numeric column names + specs: (dict or str) + if nested dict: keys are column names with subdict that contains the + arguments for binning(), i.e. n_bins and lower_boundaries + squeeze_classes: (boolean) further squeeze classes by sorting the digits + within the string. + Example: from binning of 3 columns, each into 2 bins, we got + "000", "100", "010", "001", "110", "101", "011", "111". + These classes are further squeezed by within-string sorting: + "000", "001", "011", "111" + + Returns: + y: (list) of class strings of length X.shape[0] + + """ + + df_bin = pd.DataFrame() + if specs is None: + specs = {} + if type(columns) is str: + columns = [columns] + + # bin columns + for col in columns: + if col not in X.columns: + sys.exit(f"column {col} not in dataframe") + if col in specs: + kwargs = specs[col] + else: + kwargs = {"nbins": 2} + yc = binning(X[col].to_numpy(), **kwargs) + df_bin[col] = yc.astype(str) + + # concatenate + df_bin["binvar"] = "" + for col in columns: + df_bin["binvar"] += df_bin[col] + + # squeeze + if squeeze_classes: + + def squeezing(x): + return "".join(sorted(x)) + + df_bin["binvar"] = df_bin["binvar"].apply(squeezing) + + y = df_bin["binvar"].tolist() + return y + + +def binning(y, nbins=3, lower_boundaries=None): + """ + bins numeric array y either intrinsically into nbins classes + based on an equidistant percentile split, or extrinsically + by using the lower_boundaries values. + + Args: + y: (np.array) with numeric data + nbins: (int) number of bins + lower_boundaries: (list) of lower bin boundaries. + If provided nbins will be ignored and y is binned + extrinsically. The first value of lower_boundaries + is always corrected not to be higher than min(y). + Returns: + yc: (np.array) with bin IDs (integers from 0 to nbins-1) + """ + + # intrinsic binning by equidistant percentiles + if lower_boundaries is None: + prct = np.linspace(0, 100, nbins + 1) + lower_boundaries = np.percentile(y, prct) + lower_boundaries = lower_boundaries[0:nbins] + else: + # make sure that entire range of y is covered + lower_boundaries[0] = min(lower_boundaries[0], np.min(y)) + + # binned array + yc = np.zeros(len(y), dtype=int) + for i in range(1, len(lower_boundaries)): + yc[y >= lower_boundaries[i]] = i + + return yc + + +def optimize_testset_split( + X, y, split_on, stratify_on, weight=None, test_size=0.1, k=30, seed=42 +): + """backward compatibility""" + return optimize_traintest_split( + X, y, split_on, stratify_on, weight, test_size, k, seed + ) + + +def exit_message(full_target_coverage, infx="test"): + if not full_target_coverage: + return ( + "not all partitions contain all target classes. What you can do:\n" + "(1) increase your dev and/or test partition, or\n" + "(2) reduce the amount of target classes by merging some of them." + ) + + return ( + f"\n:-o No {infx} set split found. Reason is, that for at least one of the\n" + f"stratification variables not all its values can make it into the {infx} set.\n" + f"This happens e.g. if the {infx} set size is chosen too small or\n" + "if the (multidimensional) distribution of the stratification\n" + "variables is sparse. What you can do:\n" + "(1) remove a variable from this stratification, or\n" + "(2) merge classes within a variable to increase the per class probabilities, or\n" + f"(3) increase the {infx} set size, or\n" + "(4) increase the number of different splits (if it was small, say < 10, before), or\n" + "(5) in case your target is numeric and you have added a binned target array to the\n" + " stratification variables: reduce the number of bins.\n" + "Good luck!\n" + )