Skip to content

Commit

Permalink
refactor(KDP): adding crosses and bucketized cols
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrlaczkowski committed Mar 10, 2024
1 parent 2b1fa04 commit 570553d
Showing 1 changed file with 84 additions and 49 deletions.
133 changes: 84 additions & 49 deletions kdp/idea.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,17 +194,17 @@ def create_integer_lookup_layer(vocabulary: list[int], num_oov_indices: int, nam
)

@staticmethod
def create_crossing_layer(keys: list, depth: int, name: str) -> tf.keras.layers.Layer:
def create_crossing_layer(nr_bins: list, name: str) -> tf.keras.layers.Layer:
"""Create a crossing layer.
Args:
keys: The keys.
depth: The depth.
nr_bins: Nr Bins.
name: The name of the layer.
"""
return tf.keras.layers.Crossing(
keys=keys,
depth=depth,
return tf.keras.layers.HashedCrossing(
num_bins=nr_bins,
output_mode="int",
sparse=False,
name=name,
)

Expand Down Expand Up @@ -256,7 +256,7 @@ def __init__(
self.categorical_features = categorical_features or []
self.category_encoding_option = category_encoding_option
self.features_stats_path = features_stats_path or "features_stats.json"
self.feature_crosses = feature_crosses or [] # Store the feature crosses
self.feature_crosses = feature_crosses or []
self.numeric_feature_buckets = numeric_feature_buckets or {}
self.output_mode = output_mode
self.overwrite_stats = overwrite_stats
Expand All @@ -267,13 +267,15 @@ def __init__(
self.inputs = {}
self.signature = {}
self.outputs = {}
self.output_dims = 0

if log_to_file:
logger.info("Logging to file enabled 🗂️")
logger.add("PreprocessModel.log")

# Initializing Data Stats object
# TODO: add crosses and buckets into stats as well
# we only need numeric and cat features stats for layers
# crosses and numeric do not need layers init
self.stats_instance = DatasetStatistics(
path_data=self.path_data,
numeric_cols=self.numeric_features,
Expand Down Expand Up @@ -349,6 +351,8 @@ def _add_pipeline_numeric(self, feature_name: str, input_layer, stats: dict) ->
name=f"norm_{feature_name}",
)
self.outputs[feature_name] = preprocessor.chain(input_layer=input_layer)
# updating output vector dim
self.output_dims += 1

def _add_pipeline_categorical(self, feature_name: str, input_layer, stats: dict) -> None:
"""Add a categorical preprocessing step to the pipeline.
Expand Down Expand Up @@ -401,54 +405,80 @@ def _add_pipeline_categorical(self, feature_name: str, input_layer, stats: dict)
# adding outputs
# self.outputs[feature_name] = preprocessor.preprocess(input_tensor)
self.outputs[feature_name] = preprocessor.chain(input_layer=input_layer)
# updating output vector dim
self.output_dims += emb_size

def _add_pipeline_bucketize(self, feature_name: str, input_layer, stats: dict) -> None:
def _add_pipeline_bucketize(self, feature_name: str, input_layer) -> None:
"""Add a bucketization preprocessing step to the pipeline.
Args:
feature_name (str): The name of the feature to be preprocessed.
input_layer: The input layer for the feature.
stats (dict): A dictionary containing the metadata of the feature, including
the boundaries of the buckets.
"""
boundaries = stats["boundaries"]
preprocessor = FeaturePreprocessor(name=feature_name)
for feature_name, boundaries in self.numeric_feature_buckets.items():
logger.info(f"Adding bucketized {feature_name = } 🪣")
preprocessor = FeaturePreprocessor(name=feature_name)

preprocessor.add_processing_step(
layer_creator=PreprocessorLayerFactory.create_discretization_layer,
boundaries=boundaries,
name=f"bucketize_{feature_name}",
)
preprocessor.add_processing_step(
layer_creator=PreprocessorLayerFactory.create_category_encoding_layer,
num_tokens=len(boundaries) + 1,
output_mode="one_hot",
name=f"one_hot_{feature_name}",
)
self.outputs[feature_name] = preprocessor.chain(input_layer=input_layer)
# checking inputs
_input = self.inputs.get(feature_name)
if not _input:
self._add_input_column(feature_name=feature_name, dtype=tf.float32)

def _add_pipeline_cross(self, feature_name: str, stats: dict) -> None:
preprocessor.add_processing_step(
layer_creator=PreprocessorLayerFactory.create_discretization_layer,
boundaries=boundaries,
name=f"bucketize_{feature_name}",
)
preprocessor.add_processing_step(
layer_creator=PreprocessorLayerFactory.create_category_encoding_layer,
num_tokens=len(boundaries) + 1,
output_mode="one_hot",
name=f"one_hot_{feature_name}",
)
_output_pipe = preprocessor.chain(input_layer=input_layer)
# Cast the crossed feature to float32
self.outputs[feature_name] = tf.cast(_output_pipe, tf.float32)
# updating output vector dim
self.output_dims += 1
logger.info("Bucketized Column ✅")

def _add_pipeline_cross(self, stats: dict) -> None:
"""Add a crossing preprocessing step to the pipeline.
Args:
feature_name (str): The name of the feature to be crossed.
stats (dict): A dictionary containing the metadata of the feature, including
the list of features it is crossed with and the depth of the crossing.
"""
crossed_features = stats["crossed_with"]
for cross_feature, depth in crossed_features.items():
preprocessor = FeaturePreprocessor(name=f"{feature_name}_x_{cross_feature}")
# Note: The Crossing layer is hypothetical and not actually part of TensorFlow's Keras API.
# You would need to implement a custom layer or use feature engineering before this step.
for feature_a, feature_b, nr_bins in self.feature_crosses:
preprocessor = FeaturePreprocessor(name=f"{feature_a}_x_{feature_b}")
# checking existance of inputs
input_a = self.inputs.get(feature_a)
input_b = self.inputs.get(feature_b)

logger.debug(f"input_a: {input_a}")
logger.debug(f"input_b: {input_b}")

# checking inputs existance for feature A
if feature_a is None:
logger.info(f"Creating: {input_a} inputs and signature")
_col_dtype = stats[input_a].get("dtype")
self._add_input_column(feature_name=feature_a, dtype=_col_dtype)

# checking inputs existance for feature B
if input_b is None:
logger.info(f"Creating: {input_b} inputs and signature")
_col_dtype = stats[input_b].get("dtype")
self._add_input_column(feature_name=input_b, dtype=_col_dtype)

preprocessor.add_processing_step(
layer_creator=PreprocessorLayerFactory.create_crossing_layer,
keys=[feature_name, cross_feature],
depth=depth,
name=f"cross_{feature_name}_{cross_feature}",
depth=nr_bins,
name=f"cross_{feature_a}_{feature_b}",
)
# Assuming the inputs dictionary already contains the features to be crossed
crossed_input = [self.inputs[feature_name], self.inputs[cross_feature]]
self.outputs[f"{feature_name}_x_{cross_feature}"] = preprocessor.chain(input_data=crossed_input)
crossed_input = [self.inputs[feature_a], self.inputs[feature_b]]
self.outputs[f"{feature_a}_x_{feature_b}"] = preprocessor.chain(input_data=crossed_input)
# updating output based on the one-hot-encoded data
self.output_dims += nr_bins

def _prepare_outputs(self) -> None:
"""Preparing the outputs of the model."""
Expand All @@ -474,6 +504,7 @@ def build_preprocessor(self) -> tf.keras.Model:
logger.info("No input features_stats detected !")
self.features_stats = self.stats_instance.main()

# NUMERICAL AND CATEGORICAL FEATURES
for feature_name, stats in self.features_stats.items():
dtype = stats.get("dtype")
logger.info(f"Processing {feature_name = }, {dtype = } 📊")
Expand All @@ -482,29 +513,33 @@ def build_preprocessor(self) -> tf.keras.Model:
self._add_input_signature(feature_name=feature_name, dtype=dtype)
input_layer = self.inputs[feature_name]

# NUMERIC FEATURES
if "mean" in stats:
self._add_pipeline_numeric(
feature_name=feature_name,
input_layer=input_layer,
stats=stats,
)
# CATEGORICAL FEATURES
elif "vocab" in stats:
self._add_pipeline_categorical(
feature_name=feature_name,
input_layer=input_layer,
stats=stats,
)
elif "boundaries" in stats:
self._add_pipeline_bucketize(
feature_name=feature_name,
input_layer=input_layer,
stats=stats,
)
if feature_name in self.features_stats and "crossed_with" in stats:
self._add_pipeline_cross(
feature_name=feature_name,
stats=stats,
)
# BUCKETIZED NUMERIC FEATURES
if self.numeric_feature_buckets:
self._add_pipeline_bucketize(
feature_name=feature_name,
input_layer=input_layer,
stats=stats,
)
# CROSSING FEATURES
if self.feature_crosses:
self._add_pipeline_cross(
feature_name=feature_name,
input_layer=input_layer,
)

# building model
logger.info("Building preprocessor Model 🏗️")
Expand Down

0 comments on commit 570553d

Please sign in to comment.