Skip to content

Latest commit

 

History

History
286 lines (232 loc) · 10.9 KB

data_loader.md

File metadata and controls

286 lines (232 loc) · 10.9 KB

Data loader

This class is used to load and process the data. It is used in the tutorial to load the data. It requires to have the data already split into training and test sets and in csv format.

###############################################################################
# File:         dataloader.py
# Description:  Prepare data for neural networks
# Authors:      Luong-Ha Nguyen & James-A. Goulet
# Created:      October 12, 2022
# Updated:      October 30, 2022
# Contact:      [email protected] & [email protected]
# Copyright (c) 2022 Luong-Ha Nguyen & James-A. Goulet. Some rights reserved.
###############################################################################
from abc import ABC, abstractmethod

import numpy as np
import pandas as pd
from pytagi  import Normalizer, Utils


class DataloaderBase(ABC):
    """Dataloader template"""

    normalizer: Normalizer = Normalizer()

    def __init__(self, batch_size: int) -> None:
        self.batch_size = batch_size

    @abstractmethod
    def process_data(self) -> dict:

        raise NotImplementedError

    def create_data_loader(self, raw_input: np.ndarray,
                           raw_output: np.ndarray) -> list:
        """Create dataloader based on batch size"""
        num_input_data = raw_input.shape[0]
        num_output_data = raw_output.shape[0]
        assert num_input_data == num_output_data

        # Even indices
        even_indices = self.split_evenly(num_input_data, self.batch_size)

        if np.mod(num_input_data, self.batch_size) != 0:
            # Remider indices
            rem_indices = self.split_reminder(num_input_data, self.batch_size)
            even_indices.append(rem_indices)

        indices = np.stack(even_indices)
        input_data = raw_input[indices]
        output_data = raw_output[indices]
        dataset = []
        for x_batch, y_batch in zip(input_data, output_data):
            dataset.append((x_batch, y_batch))
        return dataset

    @staticmethod
    def split_data(data: int,
                   test_ratio: float = 0.2,
                   val_ratio: float = 0.0) -> dict:
        """Split data into training, validation, and test sets"""
        num_data = data.shape[1]
        splited_data = {}
        if val_ratio != 0.0:
            end_val_idx = num_data - int(test_ratio * num_data)
            end_train_idx = int(end_val_idx - val_ratio * end_val_idx)
            splited_data["train"] = data[:end_train_idx]
            splited_data["val"] = data[end_train_idx:end_val_idx]
            splited_data["test"] = data[end_val_idx:]
        else:
            end_train_idx = num_data - int(test_ratio * num_data)
            splited_data["train"] = data[:end_train_idx]
            splited_data["val"] = []
            splited_data["test"] = data[end_train_idx:]

        return splited_data

    @staticmethod
    def load_data_from_csv(data_file: str) -> pd.DataFrame:
        """Load data from csv file"""

        data = pd.read_csv(data_file, skiprows=1, delimiter=",", header=None)

        return data.values

    @staticmethod
    def split_evenly(num_data, chunk_size: int):
        """split data evenly"""
        indices = np.arange(int(num_data - np.mod(num_data, chunk_size)))

        return np.split(indices, int(np.floor(num_data / chunk_size)))

    @staticmethod
    def split_reminder(num_data: int, chunk_size: int):
        """Pad the reminder"""
        indices = np.arange(num_data)
        reminder_start = int(num_data - np.mod(num_data, chunk_size))
        num_samples = chunk_size - (num_data - reminder_start)
        random_idx = np.random.choice(indices, size=num_samples, replace=False)
        reminder_idx = indices[reminder_start:]

        return np.concatenate((random_idx, reminder_idx))


class RegressionDataLoader(DataloaderBase):
    """Load and format data that are feeded to the neural network.
     The user must provide the input and output data file in *csv"""

    def __init__(self, batch_size: int, num_inputs: int,
                 num_outputs: int) -> None:
        super().__init__(batch_size)
        self.num_inputs = num_inputs
        self.num_outputs = num_outputs

    def process_data(self, x_train_file: str, y_train_file: str,
                     x_test_file: str, y_test_file: str) -> dict:
        """Process data from the csv file"""

        # Load data
        x_train = self.load_data_from_csv(x_train_file)
        y_train = self.load_data_from_csv(y_train_file)
        x_test = self.load_data_from_csv(x_test_file)
        y_test = self.load_data_from_csv(y_test_file)

        # Normalizer
        x_mean, x_std = self.normalizer.compute_mean_std(
            np.concatenate((x_train, x_test)))
        y_mean, y_std = self.normalizer.compute_mean_std(
            np.concatenate((y_train, y_test)))

        x_train = self.normalizer.standardize(data=x_train,
                                              mu=x_mean,
                                              std=x_std)
        y_train = self.normalizer.standardize(data=y_train,
                                              mu=y_mean,
                                              std=y_std)
        x_test = self.normalizer.standardize(data=x_test, mu=x_mean, std=x_std)
        y_test = self.normalizer.standardize(data=y_test, mu=y_mean, std=y_std)

        # Dataloader
        data_loader = {}
        data_loader["train"] = (x_train, y_train)
        data_loader["test"] = self.create_data_loader(raw_input=x_test,
                                                      raw_output=y_test)
        data_loader["x_norm_param_1"] = x_mean
        data_loader["x_norm_param_2"] = x_std
        data_loader["y_norm_param_1"] = y_mean
        data_loader["y_norm_param_2"] = y_std

        return data_loader


class MnistDataloader(DataloaderBase):
    """Data loader for mnist dataset"""

    def process_data(self, x_train_file: str, y_train_file: str,
                     x_test_file: str, y_test_file: str) -> dict:
        """Process mnist images"""
        # Initialization
        utils = Utils()
        num_train_images = 60000
        num_test_images = 10000

        # Traininng set
        train_images, train_labels = utils.load_mnist_images(
            image_file=x_train_file,
            label_file=y_train_file,
            num_images=num_train_images)

        y_train, y_train_idx, num_enc_obs = utils.label_to_obs(
            labels=train_labels, num_classes=10)
        x_mean, x_std = self.normalizer.compute_mean_std(train_images)
        x_std = 1

        # Test set
        test_images, test_labels = utils.load_mnist_images(
            image_file=x_test_file,
            label_file=y_test_file,
            num_images=num_test_images)

        # Normalizer
        x_train = self.normalizer.standardize(data=train_images,
                                              mu=x_mean,
                                              std=x_std)
        x_test = self.normalizer.standardize(data=test_images,
                                             mu=x_mean,
                                             std=x_std)

        y_train = y_train.reshape((num_train_images, num_enc_obs))
        y_train_idx = y_train_idx.reshape((num_train_images, num_enc_obs))
        x_train = x_train.reshape((num_train_images, 28, 28))
        x_test = x_test.reshape((num_test_images, 28, 28))

        # Data loader
        data_loader = {}
        data_loader["train"] = (x_train, y_train, y_train_idx, train_labels)
        data_loader["test"] = self.create_data_loader(raw_input=x_test,
                                                      raw_output=test_labels)
        data_loader["x_norm_param_1"] = x_mean
        data_loader["x_norm_param_2"] = x_std

        return data_loader


class TimeSeriesDataloader(DataloaderBase):
    """Data loader for time series"""

    def __init__(self, batch_size: int, output_col: np.ndarray,
                 input_seq_len: int, output_seq_len: int, num_features: int,
                 stride: int) -> None:
        super().__init__(batch_size)
        self.output_col = output_col
        self.input_seq_len = input_seq_len
        self.output_seq_len = output_seq_len
        self.num_features = num_features
        self.stride = stride

    def process_data(self, x_train_file: str, datetime_train_file: str,
                     x_test_file: str, datetime_test_file: str) -> dict:
        """Process time series"""
        # Initialization
        utils = Utils()

        # Load data
        x_train = self.load_data_from_csv(x_train_file)
        datetime_train = self.load_data_from_csv(datetime_train_file)

        x_test = self.load_data_from_csv(x_test_file)
        datetime_test = self.load_data_from_csv(datetime_test_file)

        # Normalizer
        x_mean, x_std = self.normalizer.compute_mean_std(x_train)
        x_train = self.normalizer.standardize(data=x_train,
                                              mu=x_mean,
                                              std=x_std)
        x_test = self.normalizer.standardize(data=x_test, mu=x_mean, std=x_std)

        # Create rolling windows
        x_train_rolled, y_train_rolled = utils.create_rolling_window(
            data=x_train,
            output_col=self.output_col,
            input_seq_len=self.input_seq_len,
            output_seq_len=self.output_seq_len,
            num_features=self.num_features,
            stride=self.stride)

        x_test_rolled, y_test_rolled = utils.create_rolling_window(
            data=x_test,
            output_col=self.output_col,
            input_seq_len=self.input_seq_len,
            output_seq_len=self.output_seq_len,
            num_features=self.num_features,
            stride=self.stride)

        # Dataloader
        data_loader = {}
        data_loader["train"] = (x_train_rolled, y_train_rolled)
        data_loader["test"] = self.create_data_loader(raw_input=x_test_rolled,
                                                      raw_output=y_test_rolled)
        # Store normalization parameters
        data_loader["x_norm_param_1"] = x_mean
        data_loader["x_norm_param_2"] = x_std
        data_loader["y_norm_param_1"] = x_mean[self.output_col]
        data_loader["y_norm_param_2"] = x_std[self.output_col]

        # NOTE: Datetime is saved for the visualization purpose
        data_loader["datetime_train"] = [
            np.datetime64(date) for date in np.squeeze(datetime_train)
        ]
        data_loader["datetime_test"] = [
            np.datetime64(date) for date in np.squeeze(datetime_test)
        ]

        return data_loader