Skip to content

Commit

Permalink
fix(KDP): adding missing text features vocab attribution
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrlaczkowski committed Apr 19, 2024
1 parent 2760a3c commit d9cbc13
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 4 deletions.
1 change: 0 additions & 1 deletion kdp/custom_layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def call(self, x: tf.Tensor) -> tf.Tensor:
"""
x = tf.strings.lower(x)
x = tf.strings.regex_replace(x, f"[{self.punctuation_pattern}]", " ")
x = tf.strings.regex_replace(x, r"[\d+]", " ")
stop_words_regex = rf"\b({self.stop_words_pattern})\b\s?"
x = tf.strings.regex_replace(x, stop_words_regex, " ")
x = tf.strings.regex_replace(x, r"\s+", " ")
Expand Down
12 changes: 10 additions & 2 deletions kdp/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def _init_stats(self) -> None:
features_specs=self.features_specs,
numeric_features=self.numeric_features,
categorical_features=self.categorical_features,
text_features=self.text_features,
)
self.features_stats = self.stats_instance._load_stats()

Expand Down Expand Up @@ -404,16 +405,21 @@ def _add_pipeline_categorical(self, feature_name: str, input_layer, stats: dict)
# adding outputs
self.outputs[feature_name] = preprocessor.chain(input_layer=input_layer)

def _add_pipeline_text(self, feature_name: str, input_layer) -> None:
def _add_pipeline_text(self, feature_name: str, input_layer, stats: dict) -> None:
"""Add a text preprocessing step to the pipeline.
Args:
feature_name (str): The name of the feature to be preprocessed.
input_layer: The input layer for the feature.
stats (dict): A dictionary containing the metadata of the feature, including
"""
# getting feature object
_feature = self.features_specs[feature_name]

# getting stats
_vocab = stats["vocab"]
logger.debug(f"TEXT: {_vocab = }")

# initializing preprocessor
preprocessor = FeaturePreprocessor(name=feature_name)

Expand All @@ -440,6 +446,7 @@ def _add_pipeline_text(self, feature_name: str, input_layer) -> None:
preprocessor.add_processing_step(
layer_creator=PreprocessorLayerFactory.text_vectorization_layer,
name=f"text_vactorizer_{feature_name}",
vocabulary=_vocab,
**_feature.kwargs,
)
# for concatenation we need the same format
Expand Down Expand Up @@ -535,7 +542,7 @@ def build_preprocessor(self) -> tf.keras.Model:
stats=stats,
)
# CATEGORICAL FEATURES
elif "vocab" in stats:
elif "vocab" in stats and feature_name not in self.text_features:
self._add_pipeline_categorical(
feature_name=feature_name,
input_layer=input_layer,
Expand All @@ -555,6 +562,7 @@ def build_preprocessor(self) -> tf.keras.Model:
self._add_pipeline_text(
feature_name=feature_name,
input_layer=input_layer,
stats=stats,
)

# Preparing outputs
Expand Down
70 changes: 69 additions & 1 deletion kdp/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,62 @@ def get_unique_values(self) -> list:
return tf.unique(all_values)[0].numpy().tolist()


class TextAccumulator:
def __init__(self) -> None:
"""Initializes the accumulator for text values, where each entry is a list of words separated by spaces.
Attributes:
words (tf.Variable): TensorFlow variable to store unique words as strings.
"""
self.words = tf.Variable(
[],
dtype=tf.string,
shape=tf.TensorShape(None),
trainable=False,
)
logger.info("TextAccumulator initialized.")

@tf.function
def update(self, new_texts: tf.Tensor) -> None:
"""Updates the accumulator with new text values, extracting words and accumulating unique ones.
Args:
new_texts: A batch of text values (tf.Tensor of dtype tf.string),
each entry containing words separated by spaces.
Raises:
ValueError: If the input tensor is not of dtype tf.string.
"""
if new_texts.dtype != tf.string:
raise ValueError(f"Unsupported data type for text features: {new_texts.dtype}")

# Split each string into words and flatten the list
new_texts = tf.strings.regex_replace(new_texts, r"\s+", " ")
split_words = tf.strings.split(new_texts).flat_values
split_words = tf.strings.lower(split_words)

# Concatenate new words with existing words and update unique words
updated_words = tf.unique(tf.concat([self.words, split_words], axis=0))[0]
self.words.assign(updated_words)

def get_unique_words(self) -> list:
"""Returns the unique words accumulated so far as a list of strings.
Returns:
list of str: Unique words accumulated.
"""
unique_words = self.words.value().numpy().tolist()
return unique_words


class DatasetStatistics:
def __init__(
self,
path_data: str,
features_specs: dict[str, FeatureType | str] = None,
numeric_features: list[NumericalFeature] = None,
categorical_features: list[CategoricalFeature] = None,
text_features: list[CategoricalFeature] = None,
features_stats_path: Path = None,
overwrite_stats: bool = False,
batch_size: int = 50_000,
Expand All @@ -124,10 +173,12 @@ def __init__(
Easier alternative to proviginh numerical and categorical lists.
numeric_features: A list of numerical features to calculate statistics for (defaults to None).
categorical_features: A list of categorical features to calculate statistics for (defaults to None).
text_features: A list of text features to calculate statistics for (defaults to None).
"""
self.path_data = path_data
self.numeric_features = numeric_features or []
self.categorical_features = categorical_features or []
self.text_features = text_features or []
self.features_specs = features_specs or {}
self.features_stats_path = features_stats_path or "features_stats.json"
self.overwrite_stats = overwrite_stats
Expand All @@ -139,6 +190,7 @@ def __init__(
# Initializing placeholders for statistics
self.numeric_stats = {col: WelfordAccumulator() for col in self.numeric_features}
self.categorical_stats = {col: CategoricalAccumulator() for col in self.categorical_features}
self.text_stats = {col: TextAccumulator() for col in self.text_features}

def _get_csv_file_pattern(self, path) -> str:
"""Get the csv file pattern that will handle directories and file paths.
Expand Down Expand Up @@ -187,10 +239,17 @@ def _process_batch(self, batch: tf.Tensor) -> None:
for feature in self.categorical_features:
self.categorical_stats[feature].update(batch[feature])

for feature in self.text_features:
self.text_stats[feature].update(batch[feature])

def _compute_final_statistics(self) -> dict[str, dict]:
"""Compute final statistics for numeric and categorical features."""
logger.info("Computing final statistics for numeric and categorical features 📊")
final_stats = {"numeric_stats": {}, "categorical_stats": {}}
final_stats = {
"numeric_stats": {},
"categorical_stats": {},
"text_stats": {},
}
for feature in self.numeric_features:
logger.debug(f"numeric {feature =}")
final_stats["numeric_stats"][feature] = {
Expand Down Expand Up @@ -219,6 +278,15 @@ def _compute_final_statistics(self) -> dict[str, dict]:
"dtype": _dtype,
}

for feature in self.text_features:
logger.debug(f"text {feature = }, {self.text_stats = }")
unique_words = self.text_stats[feature].get_unique_words()
final_stats["text_stats"][feature] = {
"size": len(unique_words),
"vocab": unique_words,
"dtype": self.features_specs[feature].dtype,
}

return final_stats

def calculate_dataset_statistics(self, dataset: tf.data.Dataset) -> dict[str, dict]:
Expand Down

0 comments on commit d9cbc13

Please sign in to comment.