From 570553dcf6352d90200493f2e60bf004f4dd0a84 Mon Sep 17 00:00:00 2001 From: piotrlaczkowski Date: Sun, 10 Mar 2024 23:01:58 +0100 Subject: [PATCH] refactor(KDP): adding crosses and bucketized cols --- kdp/idea.py | 133 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 84 insertions(+), 49 deletions(-) diff --git a/kdp/idea.py b/kdp/idea.py index 92a119b..536bcfb 100644 --- a/kdp/idea.py +++ b/kdp/idea.py @@ -194,17 +194,17 @@ def create_integer_lookup_layer(vocabulary: list[int], num_oov_indices: int, nam ) @staticmethod - def create_crossing_layer(keys: list, depth: int, name: str) -> tf.keras.layers.Layer: + def create_crossing_layer(nr_bins: list, name: str) -> tf.keras.layers.Layer: """Create a crossing layer. Args: - keys: The keys. - depth: The depth. + nr_bins: Nr Bins. name: The name of the layer. """ - return tf.keras.layers.Crossing( - keys=keys, - depth=depth, + return tf.keras.layers.HashedCrossing( + num_bins=nr_bins, + output_mode="int", + sparse=False, name=name, ) @@ -256,7 +256,7 @@ def __init__( self.categorical_features = categorical_features or [] self.category_encoding_option = category_encoding_option self.features_stats_path = features_stats_path or "features_stats.json" - self.feature_crosses = feature_crosses or [] # Store the feature crosses + self.feature_crosses = feature_crosses or [] self.numeric_feature_buckets = numeric_feature_buckets or {} self.output_mode = output_mode self.overwrite_stats = overwrite_stats @@ -267,13 +267,15 @@ def __init__( self.inputs = {} self.signature = {} self.outputs = {} + self.output_dims = 0 if log_to_file: logger.info("Logging to file enabled 🗂️") logger.add("PreprocessModel.log") # Initializing Data Stats object - # TODO: add crosses and buckets into stats as well + # we only need numeric and cat features stats for layers + # crosses and numeric do not need layers init self.stats_instance = DatasetStatistics( path_data=self.path_data, numeric_cols=self.numeric_features, @@ -349,6 +351,8 @@ def _add_pipeline_numeric(self, feature_name: str, input_layer, stats: dict) -> name=f"norm_{feature_name}", ) self.outputs[feature_name] = preprocessor.chain(input_layer=input_layer) + # updating output vector dim + self.output_dims += 1 def _add_pipeline_categorical(self, feature_name: str, input_layer, stats: dict) -> None: """Add a categorical preprocessing step to the pipeline. @@ -401,54 +405,80 @@ def _add_pipeline_categorical(self, feature_name: str, input_layer, stats: dict) # adding outputs # self.outputs[feature_name] = preprocessor.preprocess(input_tensor) self.outputs[feature_name] = preprocessor.chain(input_layer=input_layer) + # updating output vector dim + self.output_dims += emb_size - def _add_pipeline_bucketize(self, feature_name: str, input_layer, stats: dict) -> None: + def _add_pipeline_bucketize(self, feature_name: str, input_layer) -> None: """Add a bucketization preprocessing step to the pipeline. Args: feature_name (str): The name of the feature to be preprocessed. input_layer: The input layer for the feature. - stats (dict): A dictionary containing the metadata of the feature, including - the boundaries of the buckets. """ - boundaries = stats["boundaries"] - preprocessor = FeaturePreprocessor(name=feature_name) + for feature_name, boundaries in self.numeric_feature_buckets.items(): + logger.info(f"Adding bucketized {feature_name = } 🪣") + preprocessor = FeaturePreprocessor(name=feature_name) - preprocessor.add_processing_step( - layer_creator=PreprocessorLayerFactory.create_discretization_layer, - boundaries=boundaries, - name=f"bucketize_{feature_name}", - ) - preprocessor.add_processing_step( - layer_creator=PreprocessorLayerFactory.create_category_encoding_layer, - num_tokens=len(boundaries) + 1, - output_mode="one_hot", - name=f"one_hot_{feature_name}", - ) - self.outputs[feature_name] = preprocessor.chain(input_layer=input_layer) + # checking inputs + _input = self.inputs.get(feature_name) + if not _input: + self._add_input_column(feature_name=feature_name, dtype=tf.float32) - def _add_pipeline_cross(self, feature_name: str, stats: dict) -> None: + preprocessor.add_processing_step( + layer_creator=PreprocessorLayerFactory.create_discretization_layer, + boundaries=boundaries, + name=f"bucketize_{feature_name}", + ) + preprocessor.add_processing_step( + layer_creator=PreprocessorLayerFactory.create_category_encoding_layer, + num_tokens=len(boundaries) + 1, + output_mode="one_hot", + name=f"one_hot_{feature_name}", + ) + _output_pipe = preprocessor.chain(input_layer=input_layer) + # Cast the crossed feature to float32 + self.outputs[feature_name] = tf.cast(_output_pipe, tf.float32) + # updating output vector dim + self.output_dims += 1 + logger.info("Bucketized Column ✅") + + def _add_pipeline_cross(self, stats: dict) -> None: """Add a crossing preprocessing step to the pipeline. Args: - feature_name (str): The name of the feature to be crossed. stats (dict): A dictionary containing the metadata of the feature, including the list of features it is crossed with and the depth of the crossing. """ - crossed_features = stats["crossed_with"] - for cross_feature, depth in crossed_features.items(): - preprocessor = FeaturePreprocessor(name=f"{feature_name}_x_{cross_feature}") - # Note: The Crossing layer is hypothetical and not actually part of TensorFlow's Keras API. - # You would need to implement a custom layer or use feature engineering before this step. + for feature_a, feature_b, nr_bins in self.feature_crosses: + preprocessor = FeaturePreprocessor(name=f"{feature_a}_x_{feature_b}") + # checking existance of inputs + input_a = self.inputs.get(feature_a) + input_b = self.inputs.get(feature_b) + + logger.debug(f"input_a: {input_a}") + logger.debug(f"input_b: {input_b}") + + # checking inputs existance for feature A + if feature_a is None: + logger.info(f"Creating: {input_a} inputs and signature") + _col_dtype = stats[input_a].get("dtype") + self._add_input_column(feature_name=feature_a, dtype=_col_dtype) + + # checking inputs existance for feature B + if input_b is None: + logger.info(f"Creating: {input_b} inputs and signature") + _col_dtype = stats[input_b].get("dtype") + self._add_input_column(feature_name=input_b, dtype=_col_dtype) + preprocessor.add_processing_step( layer_creator=PreprocessorLayerFactory.create_crossing_layer, - keys=[feature_name, cross_feature], - depth=depth, - name=f"cross_{feature_name}_{cross_feature}", + depth=nr_bins, + name=f"cross_{feature_a}_{feature_b}", ) - # Assuming the inputs dictionary already contains the features to be crossed - crossed_input = [self.inputs[feature_name], self.inputs[cross_feature]] - self.outputs[f"{feature_name}_x_{cross_feature}"] = preprocessor.chain(input_data=crossed_input) + crossed_input = [self.inputs[feature_a], self.inputs[feature_b]] + self.outputs[f"{feature_a}_x_{feature_b}"] = preprocessor.chain(input_data=crossed_input) + # updating output based on the one-hot-encoded data + self.output_dims += nr_bins def _prepare_outputs(self) -> None: """Preparing the outputs of the model.""" @@ -474,6 +504,7 @@ def build_preprocessor(self) -> tf.keras.Model: logger.info("No input features_stats detected !") self.features_stats = self.stats_instance.main() + # NUMERICAL AND CATEGORICAL FEATURES for feature_name, stats in self.features_stats.items(): dtype = stats.get("dtype") logger.info(f"Processing {feature_name = }, {dtype = } 📊") @@ -482,29 +513,33 @@ def build_preprocessor(self) -> tf.keras.Model: self._add_input_signature(feature_name=feature_name, dtype=dtype) input_layer = self.inputs[feature_name] + # NUMERIC FEATURES if "mean" in stats: self._add_pipeline_numeric( feature_name=feature_name, input_layer=input_layer, stats=stats, ) + # CATEGORICAL FEATURES elif "vocab" in stats: self._add_pipeline_categorical( feature_name=feature_name, input_layer=input_layer, stats=stats, ) - elif "boundaries" in stats: - self._add_pipeline_bucketize( - feature_name=feature_name, - input_layer=input_layer, - stats=stats, - ) - if feature_name in self.features_stats and "crossed_with" in stats: - self._add_pipeline_cross( - feature_name=feature_name, - stats=stats, - ) + # BUCKETIZED NUMERIC FEATURES + if self.numeric_feature_buckets: + self._add_pipeline_bucketize( + feature_name=feature_name, + input_layer=input_layer, + stats=stats, + ) + # CROSSING FEATURES + if self.feature_crosses: + self._add_pipeline_cross( + feature_name=feature_name, + input_layer=input_layer, + ) # building model logger.info("Building preprocessor Model 🏗️")