Skip to content

Commit

Permalink
[WIP] Dataset builder (#32)
Browse files Browse the repository at this point in the history
* Dataset main class renaming.
* Fix bug of cloning pipeline
* Added a dataset builder with three types of failure specification modes:
* Added local test for dataset building
* Only one file with:
      An increasing feature that is the cumulated time of the piece being in place
      A column life id
      A column with a life en indicator
* two files: data + list of failures
     A list of failures
     Upload multiple files
     Separated cycles
  • Loading branch information
lucianolorenti authored Feb 24, 2024
1 parent 7862b3c commit 700458e
Show file tree
Hide file tree
Showing 47 changed files with 2,809 additions and 798 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ jobs:
python -m pip install wheel setuptools cython
python -m pip install tensorflow
if [ -f requirements.txt ]; then python -m pip install -r requirements.txt; fi
python -m pip install -e $GITHUB_WORKSPACE
python -m pip install -e $GITHUB_WORKSPACE[test]
- name: Test with pytest
run: |
coverage run --source=. -m pytest
coverage report -m
- name: Coveralls
uses: AndreMiras/coveralls-python-action@develop
with:
Expand Down
2 changes: 1 addition & 1 deletion ceruleo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
CACHE_PATH.mkdir(parents=True, exist_ok=True)


__version__ = "2.0.6"
__version__ = "3.0.0"
4 changes: 2 additions & 2 deletions ceruleo/dataset/analysis/correlation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
from typing import List, Optional, Tuple

import pandas as pd
from ceruleo.dataset.ts_dataset import AbstractTimeSeriesDataset
from ceruleo.dataset.ts_dataset import AbstractPDMDataset
from ceruleo.dataset.utils import iterate_over_features


def correlation_analysis(
dataset: AbstractTimeSeriesDataset,
dataset: AbstractPDMDataset,
corr_threshold: float = 0.7,
features: Optional[List[str]] = None,
) -> pd.DataFrame:
Expand Down
6 changes: 3 additions & 3 deletions ceruleo/dataset/analysis/distribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import numpy as np
import pandas as pd
from ceruleo.dataset.ts_dataset import AbstractTimeSeriesDataset
from ceruleo.dataset.ts_dataset import AbstractPDMDataset
from ceruleo.dataset.utils import iterate_over_features
from scipy.special import kl_div
from scipy.stats import wasserstein_distance
Expand All @@ -31,7 +31,7 @@ def histogram_per_life(
logger.info(f"Error {e} when computing the distribution for feature {feature}")


def compute_bins(ds: AbstractTimeSeriesDataset, feature: str, number_of_bins: int = 15):
def compute_bins(ds: AbstractPDMDataset, feature: str, number_of_bins: int = 15):
min_value = ds.get_features_of_life(0)[feature].min()
max_value = ds.get_features_of_life(0)[feature].max()

Expand All @@ -43,7 +43,7 @@ def compute_bins(ds: AbstractTimeSeriesDataset, feature: str, number_of_bins: in


def features_divergeces(
ds: AbstractTimeSeriesDataset,
ds: AbstractPDMDataset,
number_of_bins: int = 15,
columns: Optional[List[str]] = None,
show_progress: bool = False,
Expand Down
4 changes: 2 additions & 2 deletions ceruleo/dataset/analysis/numerical_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from uncertainties import ufloat

from ceruleo.dataset.transformed import TransformedDataset
from ceruleo.dataset.ts_dataset import AbstractLivesDataset
from ceruleo.dataset.ts_dataset import AbstractPDMDataset
from ceruleo.dataset.utils import iterate_over_features_and_target


Expand Down Expand Up @@ -184,7 +184,7 @@ def merge_analysis(data: dict) -> pd.DataFrame:


def analysis(
dataset: Union[TransformedDataset, AbstractLivesDataset],
dataset: Union[TransformedDataset, AbstractPDMDataset],
*,
show_progress: bool = False,
what_to_compute: List[str] = [],
Expand Down
9 changes: 4 additions & 5 deletions ceruleo/dataset/analysis/sample_rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@

import numpy as np
import pandas as pd
from ceruleo.dataset.ts_dataset import AbstractTimeSeriesDataset
from ceruleo.dataset.ts_dataset import AbstractPDMDataset

logger = logging.getLogger(__name__)


def sample_rate(ds: AbstractTimeSeriesDataset, unit: str = "s") -> np.ndarray:
"""
Obtain an array of time difference between two consecutive samples.
def sample_rate(ds: AbstractPDMDataset, unit: str = "s") -> np.ndarray:
"""Obtain an array of time difference between two consecutive samples
If the index it's a timestamp, the time difference will be converted to the provided unit
Expand All @@ -32,7 +31,7 @@ def sample_rate(ds: AbstractTimeSeriesDataset, unit: str = "s") -> np.ndarray:


def sample_rate_summary(
ds: AbstractTimeSeriesDataset, unit: Optional[str] = "s"
ds: AbstractPDMDataset, unit: Optional[str] = "s"
) -> pd.DataFrame:
"""
Obtain the mean, mode and standard deviation of the sample rate of the dataset
Expand Down
Empty file.
180 changes: 180 additions & 0 deletions ceruleo/dataset/builder/builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import logging
import os
from pathlib import Path
from typing import Callable, List, Optional, Tuple, Union


import pandas as pd
from tqdm.auto import tqdm

from ceruleo.dataset.builder.cycles_splitter import (
CyclesSplitter,
FailureDataCycleSplitter,
)
from ceruleo.dataset.builder.output import OutputMode
from ceruleo.dataset.builder.rul_column import RULColumn
from ceruleo.dataset.ts_dataset import PDMDataset

logger = logging.getLogger(__name__)


def load_dataframe(path: Union[str, Path]) -> pd.DataFrame:
if isinstance(path, str):
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"File {path} does not exist")
if path.suffix == ".csv":
return pd.read_csv(path)
if path.suffix == ".parquet":
return pd.read_parquet(path)
if path.suffix == ".xlsx":
return pd.read_excel(path)
raise ValueError(f"Unsupported file format {path.suffix}")


class DatasetBuilder:
splitter: CyclesSplitter
output_mode: OutputMode
rul_column: RULColumn
dataframe_loader: Callable[[Union[str, Path]], pd.DataFrame]
index_column: Optional[str]

def __init__(
self,
dataframe_loader: Callable[[Union[str, Path]], pd.DataFrame] = load_dataframe,
):
"""Initializes the builder."""
self.output_mode = None
self.splitter = None
self.dataframe_loader = dataframe_loader
self.index_column = None
self.rul_column = None

@staticmethod
def one_file_format():
return DatasetBuilder()

def set_splitting_method(self, splitter: CyclesSplitter):
self.splitter = splitter
return self

def set_index_column(self, index_column: str):
self.index_column = index_column
return self

def set_machine_id_feature(self, name: str):
self._machine_type_feature = name
return self

def set_rul_column_method(self, rul_column: RULColumn):
self.rul_column = rul_column
return self

def set_output_mode(self, output_mode: OutputMode):
self.output_mode = output_mode
return self

def _validate(self):
if self.output_mode is None:
raise ValueError("Output mode not set")
if self.splitter is None:
raise ValueError("Splitting method not set")

def build(self, input_path: Path):
self._validate()
self.splitter.split(input_path, self.output_mode)

def prepare_from_data_fault_pairs_files(
self, data_fault_pairs: Union[Tuple[str, str], List[Tuple[str, str]]]
):
if not isinstance(data_fault_pairs, list):
data_fault_pairs = [data_fault_pairs]

if not isinstance(self.splitter, FailureDataCycleSplitter):
raise ValueError(
"This method is only available for FailureDataCycleSplitter"
)

common_path_prefix = os.path.commonprefix(
[data for data, fault in data_fault_pairs]
)

for i, (data, fault) in enumerate(tqdm(data_fault_pairs)):
df_data = self.dataframe_loader(data)
df_faults = self.dataframe_loader(fault)
cycles_in_file = self.splitter.split(df_data, df_faults)
for j, ds in enumerate(cycles_in_file):
cycle_id = f"{i+1}_{j+1}"
self._build_and_store_cycle(
ds,
cycle_id,
metadata={
"Raw Data Filename": str(data.relative_to(common_path_prefix)),
"Raw Fault Filename": str(
fault.relative_to(common_path_prefix)
),
},
)
self.output_mode.finish()

def build_from_data_fault_pairs_files(
self, data_fault_pairs: Union[Tuple[str, str], List[Tuple[str, str]]]
) -> PDMDataset:
self.prepare_from_data_fault_pairs_files(data_fault_pairs)
return self.output_mode.build_dataset(self)

def prepare_from_df(
self, data: Union[pd.DataFrame, List[pd.DataFrame]]
) -> PDMDataset:
if not isinstance(data, list):
data = [data]
self._validate()
for i, data_element in enumerate(data):
for j, ds in enumerate(self.splitter.split(data_element)):
cycle_id = f"{i+1}_{j+1}"
self._build_and_store_cycle(ds, cycle_id)
self.output_mode.finish()

def build_from_df(self, data: Union[pd.DataFrame, List[pd.DataFrame]]):
self.prepare_from_df(data)
return self.output_mode.build_dataset(self)

def prepare_from_data_fault_pair(
self,
data_fault_pairs: Union[
Tuple[pd.DataFrame, pd.DataFrame], List[Tuple[pd.DataFrame, pd.DataFrame]]
],
):
if not isinstance(data_fault_pairs, list):
data_fault_pairs = [data_fault_pairs]

if not isinstance(self.splitter, FailureDataCycleSplitter):
raise ValueError(
"This method is only available for FailureDataCycleSplitter"
)
for i, (data, fault) in enumerate(tqdm(data_fault_pairs)):
cycles_in_file = self.splitter.split(data, fault)
for j, ds in enumerate(cycles_in_file):
cycle_id = f"{i+1}_{j+1}"
self._build_and_store_cycle(
ds,
cycle_id,
)
self.output_mode.finish()

def build_from_data_fault_pair(
self,
data_fault_pairs: Union[
Tuple[pd.DataFrame, pd.DataFrame], List[Tuple[pd.DataFrame, pd.DataFrame]]
],
) -> PDMDataset:
self.prepare_from_data_fault_pair(data_fault_pairs)
return self.output_mode.build_dataset(self)

def _build_and_store_cycle(
self, ds: pd.DataFrame, cycle_id: any, metadata: dict = {}
):
ds["RUL"] = self.rul_column.get(ds)
if self.index_column is not None:
ds.set_index(self.index_column, inplace=True)
self.output_mode.store(f"Cycle_{cycle_id}", ds, metadata)
Loading

0 comments on commit 700458e

Please sign in to comment.