-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Custom Preprocessing Steps Revamp #3
Comments
The idea of the potential improvement would be to go over the sequence of preprocessing layers, check what they need for initialisation and then transform the input data through all initialised layers. The dummy code demonstrating the approach to implement could look, like: import tensorflow as tf
class DynamicPreprocessingPipeline:
"""
Dynamically initializes a sequence of Keras preprocessing layers based on the output
from each previous layer, allowing each layer to access the outputs of all prior layers where relevant.
"""
def __init__(self, layers):
"""
Initializes the DynamicPreprocessingPipeline with a list of layers.
Args:
layers (list): A list of Keras preprocessing layers, each potentially named for reference.
"""
self.layers = layers
def initialize_and_transform(self, init_data):
"""
Sequentially processes each layer, applying transformations selectively based on each layer's
input requirements and ensuring efficient data usage and processing. Each layer can access the output
of all previous layers.
Args:
init_data (dict): A dictionary with initialization data, dynamically keyed.
Returns:
dict: The dictionary containing selectively transformed data for each layer.
Example:
```python
layers = [
tf.keras.layers.Lambda(lambda x: tf.strings.lower(x), name="lowercase"), # Make text lowercase
tf.keras.layers.Lambda(lambda x: tf.strings.regex_replace(x, '[^a-z]', ''), name="remove_non_alpha") # Remove non-alphabetic characters
]
pipeline = DynamicPreprocessingPipeline(layers)
init_data = {
'text': tf.constant(["DOG!", "CAT", "DOG", "MOUSE"])
}
transformed_data = pipeline.initialize_and_transform(init_data)
```
"""
current_data = init_data
for i, layer in enumerate(self.layers):
# Determine necessary keys for the current layer based on its callable signature
if hasattr(layer, 'input_spec'):
required_keys = set(tf.nest.flatten(tf.nest.map_structure(lambda x: x.name, layer.input_spec)))
else:
# If no input_spec, assume all current data is needed
required_keys = current_data.keys()
# Prepare input for the current layer based on the determined keys
if i == 0:
# First layer uses the raw init_data directly
current_input = {k: v for k, v in current_data.items() if k in required_keys}
else:
# Subsequent layers use the outputs of all previous layers that are relevant
current_input = {k: current_data[k] for k in required_keys if k in current_data}
# Apply transformation selectively
transformed_output = {layer.name: layer(tf.nest.map_structure(lambda x: x, current_input))}
# Update current data with transformed output for use by the next layer
current_data.update(transformed_output)
return current_data
# Example usage
layers = [
tf.keras.layers.Lambda(lambda x: tf.strings.lower(x), name="lowercase"), # Make text lowercase
tf.keras.layers.Lambda(lambda x: tf.strings.regex_replace(x, '[^a-z]', ''), name="remove_non_alpha") # Remove non-alphabetic characters
]
pipeline = DynamicPreprocessingPipeline(layers)
init_data = {
'text': tf.constant(["DOG!", "CAT", "DOG", "MOUSE"])
}
transformed_data = pipeline.initialize_and_transform(init_data) |
A more complex implementation, but with more optimization, could be: import tensorflow as tf
class DynamicPreprocessingPipeline:
"""
Dynamically initializes and manages a sequence of Keras preprocessing layers, with selective retention of outputs
based on dependencies among layers, and supports streaming data through the pipeline.
"""
def __init__(self, layers):
"""
Initializes the pipeline with a list of preprocessing layers.
Args:
layers (list): A list of TensorFlow preprocessing layers, each potentially named for reference.
"""
self.layers = layers
self.dependency_map = self._analyze_dependencies()
def _analyze_dependencies(self):
"""
Analyzes and determines the dependencies of each layer on the outputs of previous layers.
Returns:
dict: A dictionary mapping each layer to the set of layer outputs it depends on.
"""
dependencies = {}
all_outputs = set()
for i, layer in enumerate(self.layers):
if hasattr(layer, 'input_spec') and layer.input_spec is not None:
required_inputs = set(tf.nest.flatten(tf.nest.map_structure(lambda x: x.name, layer.input_spec)))
else:
required_inputs = all_outputs # If no specific input_spec, depend on all previous outputs
dependencies[layer.name] = required_inputs
all_outputs.update(required_inputs)
# Ensure that each layer's output is available as potential input to subsequent layers
all_outputs.add(layer.name)
return dependencies
def process(self, dataset):
"""
Processes the dataset through the pipeline using TensorFlow's tf.data API for efficient streaming.
Args:
dataset (tf.data.Dataset): The dataset to process.
Returns:
tf.data.Dataset: The processed dataset.
"""
def _apply_transformations(features):
current_data = features
for i, layer in enumerate(self.layers):
# Prepare input for the current layer based on required keys from dependency map
required_keys = self.dependency_map[layer.name]
current_input = {k: current_data[k] for k in required_keys if k in current_data}
# Apply transformation
transformed_output = {layer.name: layer(current_input[k]) for k in required_keys if k in current_input}
# Update current data with transformed output
current_data.update(transformed_output)
return current_data
return dataset.map(_apply_transformations)
# Example usage
layers = [
tf.keras.layers.Lambda(lambda x: tf.strings.lower(x), name="lowercase"),
tf.keras.layers.Lambda(lambda x: tf.strings.regex_replace(x, '[^a-z]', ''), name="remove_non_alpha")
]
pipeline = DynamicPreprocessingPipeline(layers)
dataset = tf.data.Dataset.from_tensor_slices({
'text': ["DOG!", "CAT", "DOG", "MOUSE"]
}).batch(2) # Example of streaming in batches
processed_dataset = pipeline.process(dataset)
# Print results
for item in processed_dataset:
print(item) |
Another idea: import tensorflow as tf
class DynamicPreprocessingPipeline:
"""
Dynamically initializes and manages a sequence of Keras preprocessing layers, with selective retention of outputs
based on dependencies among layers, and supports streaming data through the pipeline.
"""
def __init__(self, layers):
self.layers = layers
self.dependency_map = self._analyze_dependencies()
def _analyze_dependencies(self):
dependencies = {}
all_outputs = set()
for i, layer in enumerate(self.layers):
if hasattr(layer, 'input_spec') and layer.input_spec is not None:
required_inputs = set(tf.nest.flatten(tf.nest.map_structure(lambda x: x.name, layer.input_spec)))
else:
required_inputs = all_outputs
dependencies[layer.name] = required_inputs
all_outputs.update(required_inputs)
all_outputs.add(layer.name)
return dependencies
def process(self, dataset):
def _apply_transformations(features):
current_data = features
for i, layer in enumerate(self.layers):
required_keys = self.dependency_map[layer.name]
current_input = {k: current_data[k] for k in required_keys if k in current_data}
transformed_output = {layer.name: layer(current_input[k]) for k in required_keys if k in current_input}
current_data.update(transformed_output)
return current_data
return dataset.map(_apply_transformations)
# Example usage assuming layer factory methods are compatible with tf.keras.layers
layers = [
PreprocessorLayerFactory.rescaling_layer(), # Assuming this is a callable returning a TensorFlow layer
PreprocessorLayerFactory.normalization_layer() # Assuming this is a callable returning a TensorFlow layer
]
pipeline = DynamicPreprocessingPipeline(layers)
dataset = tf.data.Dataset.from_tensor_slices({
'image_data': tf.random.uniform([10, 256, 256, 3]) # Example data
}).batch(2)
processed_dataset = pipeline.process(dataset)
# Print results
for item in processed_dataset:
print(item) the combination of these ideas should give some nice code for starters |
When defining custom preprocessing steps we need to find a way to pass an initialized output of the first layer to the input of the consecutive layer, ex:
when defining a custom preprocessor for text columns we would need to be careful about how the first layer can impact the input of the second layer -> we do not necessarily take this into account currently (in an automatic way). For example, when the first layer would make everything lowercase but the second layer would be initialized with uppercase vocabulary, this may become a problem. In this case, the second layer would always take the "UNK" token since the first layer will completely modify the input.
The text was updated successfully, but these errors were encountered: