Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom Preprocessing Steps Revamp #3

Open
piotrlaczkowski opened this issue May 3, 2024 · 3 comments
Open

Custom Preprocessing Steps Revamp #3

piotrlaczkowski opened this issue May 3, 2024 · 3 comments
Labels
good first issue Good for newcomers

Comments

@piotrlaczkowski
Copy link
Owner

When defining custom preprocessing steps we need to find a way to pass an initialized output of the first layer to the input of the consecutive layer, ex:

preprocessors=[
            PreprocessorLayerFactory.rescaling_layer,
            PreprocessorLayerFactory.normalization_layer,

        ], 

when defining a custom preprocessor for text columns we would need to be careful about how the first layer can impact the input of the second layer -> we do not necessarily take this into account currently (in an automatic way). For example, when the first layer would make everything lowercase but the second layer would be initialized with uppercase vocabulary, this may become a problem. In this case, the second layer would always take the "UNK" token since the first layer will completely modify the input.

@piotrlaczkowski piotrlaczkowski added the good first issue Good for newcomers label May 3, 2024
@piotrlaczkowski piotrlaczkowski moved this to Backlog in KDP Project Board May 3, 2024
@piotrlaczkowski piotrlaczkowski changed the title Custom Preprocessing Steps Custom Preprocessing Steps Revamp May 3, 2024
@piotrlaczkowski
Copy link
Owner Author

The idea of the potential improvement would be to go over the sequence of preprocessing layers, check what they need for initialisation and then transform the input data through all initialised layers.

The dummy code demonstrating the approach to implement could look, like:

import tensorflow as tf

class DynamicPreprocessingPipeline:
    """
    Dynamically initializes a sequence of Keras preprocessing layers based on the output
    from each previous layer, allowing each layer to access the outputs of all prior layers where relevant.
    """

    def __init__(self, layers):
        """
        Initializes the DynamicPreprocessingPipeline with a list of layers.

        Args:
            layers (list): A list of Keras preprocessing layers, each potentially named for reference.
        """
        self.layers = layers

    def initialize_and_transform(self, init_data):
        """
        Sequentially processes each layer, applying transformations selectively based on each layer's
        input requirements and ensuring efficient data usage and processing. Each layer can access the output
        of all previous layers.

        Args:
            init_data (dict): A dictionary with initialization data, dynamically keyed.

        Returns:
            dict: The dictionary containing selectively transformed data for each layer.

        Example:
            ```python
            layers = [
                tf.keras.layers.Lambda(lambda x: tf.strings.lower(x), name="lowercase"),  # Make text lowercase
                tf.keras.layers.Lambda(lambda x: tf.strings.regex_replace(x, '[^a-z]', ''), name="remove_non_alpha")  # Remove non-alphabetic characters
            ]
            pipeline = DynamicPreprocessingPipeline(layers)
            init_data = {
                'text': tf.constant(["DOG!", "CAT", "DOG", "MOUSE"])
            }
            transformed_data = pipeline.initialize_and_transform(init_data)
            ```
        """
        current_data = init_data

        for i, layer in enumerate(self.layers):
            # Determine necessary keys for the current layer based on its callable signature
            if hasattr(layer, 'input_spec'):
                required_keys = set(tf.nest.flatten(tf.nest.map_structure(lambda x: x.name, layer.input_spec)))
            else:
                # If no input_spec, assume all current data is needed
                required_keys = current_data.keys()

            # Prepare input for the current layer based on the determined keys
            if i == 0:
                # First layer uses the raw init_data directly
                current_input = {k: v for k, v in current_data.items() if k in required_keys}
            else:
                # Subsequent layers use the outputs of all previous layers that are relevant
                current_input = {k: current_data[k] for k in required_keys if k in current_data}

            # Apply transformation selectively
            transformed_output = {layer.name: layer(tf.nest.map_structure(lambda x: x, current_input))}

            # Update current data with transformed output for use by the next layer
            current_data.update(transformed_output)

        return current_data

# Example usage
layers = [
    tf.keras.layers.Lambda(lambda x: tf.strings.lower(x), name="lowercase"),  # Make text lowercase
    tf.keras.layers.Lambda(lambda x: tf.strings.regex_replace(x, '[^a-z]', ''), name="remove_non_alpha")  # Remove non-alphabetic characters
]
pipeline = DynamicPreprocessingPipeline(layers)
init_data = {
    'text': tf.constant(["DOG!", "CAT", "DOG", "MOUSE"])
}
transformed_data = pipeline.initialize_and_transform(init_data)

@piotrlaczkowski
Copy link
Owner Author

A more complex implementation, but with more optimization, could be:

import tensorflow as tf

class DynamicPreprocessingPipeline:
    """
    Dynamically initializes and manages a sequence of Keras preprocessing layers, with selective retention of outputs
    based on dependencies among layers, and supports streaming data through the pipeline.
    """

    def __init__(self, layers):
        """
        Initializes the pipeline with a list of preprocessing layers.

        Args:
            layers (list): A list of TensorFlow preprocessing layers, each potentially named for reference.
        """
        self.layers = layers
        self.dependency_map = self._analyze_dependencies()

    def _analyze_dependencies(self):
        """
        Analyzes and determines the dependencies of each layer on the outputs of previous layers.

        Returns:
            dict: A dictionary mapping each layer to the set of layer outputs it depends on.
        """
        dependencies = {}
        all_outputs = set()

        for i, layer in enumerate(self.layers):
            if hasattr(layer, 'input_spec') and layer.input_spec is not None:
                required_inputs = set(tf.nest.flatten(tf.nest.map_structure(lambda x: x.name, layer.input_spec)))
            else:
                required_inputs = all_outputs  # If no specific input_spec, depend on all previous outputs

            dependencies[layer.name] = required_inputs
            all_outputs.update(required_inputs)

            # Ensure that each layer's output is available as potential input to subsequent layers
            all_outputs.add(layer.name)

        return dependencies

    def process(self, dataset):
        """
        Processes the dataset through the pipeline using TensorFlow's tf.data API for efficient streaming.

        Args:
            dataset (tf.data.Dataset): The dataset to process.

        Returns:
            tf.data.Dataset: The processed dataset.
        """
        def _apply_transformations(features):
            current_data = features
            for i, layer in enumerate(self.layers):
                # Prepare input for the current layer based on required keys from dependency map
                required_keys = self.dependency_map[layer.name]
                current_input = {k: current_data[k] for k in required_keys if k in current_data}

                # Apply transformation
                transformed_output = {layer.name: layer(current_input[k]) for k in required_keys if k in current_input}

                # Update current data with transformed output
                current_data.update(transformed_output)

            return current_data

        return dataset.map(_apply_transformations)

# Example usage
layers = [
    tf.keras.layers.Lambda(lambda x: tf.strings.lower(x), name="lowercase"),
    tf.keras.layers.Lambda(lambda x: tf.strings.regex_replace(x, '[^a-z]', ''), name="remove_non_alpha")
]
pipeline = DynamicPreprocessingPipeline(layers)
dataset = tf.data.Dataset.from_tensor_slices({
    'text': ["DOG!", "CAT", "DOG", "MOUSE"]
}).batch(2)  # Example of streaming in batches
processed_dataset = pipeline.process(dataset)

# Print results
for item in processed_dataset:
    print(item)

@piotrlaczkowski
Copy link
Owner Author

Another idea:

import tensorflow as tf

class DynamicPreprocessingPipeline:
    """
    Dynamically initializes and manages a sequence of Keras preprocessing layers, with selective retention of outputs
    based on dependencies among layers, and supports streaming data through the pipeline.
    """
    def __init__(self, layers):
        self.layers = layers
        self.dependency_map = self._analyze_dependencies()

    def _analyze_dependencies(self):
        dependencies = {}
        all_outputs = set()
        for i, layer in enumerate(self.layers):
            if hasattr(layer, 'input_spec') and layer.input_spec is not None:
                required_inputs = set(tf.nest.flatten(tf.nest.map_structure(lambda x: x.name, layer.input_spec)))
            else:
                required_inputs = all_outputs
            dependencies[layer.name] = required_inputs
            all_outputs.update(required_inputs)
            all_outputs.add(layer.name)
        return dependencies

    def process(self, dataset):
        def _apply_transformations(features):
            current_data = features
            for i, layer in enumerate(self.layers):
                required_keys = self.dependency_map[layer.name]
                current_input = {k: current_data[k] for k in required_keys if k in current_data}
                transformed_output = {layer.name: layer(current_input[k]) for k in required_keys if k in current_input}
                current_data.update(transformed_output)
            return current_data
        return dataset.map(_apply_transformations)

# Example usage assuming layer factory methods are compatible with tf.keras.layers
layers = [
    PreprocessorLayerFactory.rescaling_layer(),  # Assuming this is a callable returning a TensorFlow layer
    PreprocessorLayerFactory.normalization_layer()  # Assuming this is a callable returning a TensorFlow layer
]
pipeline = DynamicPreprocessingPipeline(layers)
dataset = tf.data.Dataset.from_tensor_slices({
    'image_data': tf.random.uniform([10, 256, 256, 3])  # Example data
}).batch(2)
processed_dataset = pipeline.process(dataset)

# Print results
for item in processed_dataset:
    print(item)

the combination of these ideas should give some nice code for starters

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue Good for newcomers
Projects
Status: Backlog
Development

No branches or pull requests

1 participant