Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/parallelisation #166

Open
wants to merge 3 commits into
base: fix/quickstart
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
.. toctree::
:maxdepth: 2
:hidden:
:caption: API
:caption: ANALYSIS

api
analysis
examples/tutorials/plot_tuto_mcar

.. toctree::
:maxdepth: 2
:hidden:
:caption: ANALYSIS
:caption: API

analysis
examples/tutorials/plot_tuto_mcar
api
249 changes: 188 additions & 61 deletions qolmat/benchmark/comparator.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Script for comparator."""

import logging
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
from joblib import Parallel, cpu_count, delayed

from qolmat.benchmark import hyperparameters, metrics
from qolmat.benchmark.missing_patterns import _HoleGenerator
Expand Down Expand Up @@ -93,99 +94,225 @@ def get_errors(
df_errors = pd.concat(dict_errors.values(), keys=dict_errors.keys())
return df_errors

def evaluate_errors_sample(
self,
imputer: Any,
df: pd.DataFrame,
dict_config_opti_imputer: Dict[str, Any] = {},
metric_optim: str = "mse",
) -> pd.Series:
"""Evaluate the errors in the cross-validation.
def process_split(
self, split_data: Tuple[int, pd.DataFrame, pd.DataFrame]
) -> pd.DataFrame:
"""Process a split.

Parameters
----------
imputer : Any
imputation model
df : pd.DataFrame
dataframe to impute
dict_config_opti_imputer : Dict
search space for tested_model's hyperparameters
metric_optim : str
Loss function used when imputers undergo hyperparameter
optimization
split_data : Tuple
contains (split_idx, df_mask, df_origin)

Returns
-------
pd.Series
Series with the errors for each metric and each variable
pd.DataFrame
errors results

"""
list_errors = []
df_origin = df[self.selected_columns].copy()
for df_mask in self.generator_holes.split(df_origin):
df_corrupted = df_origin.copy()
df_corrupted[df_mask] = np.nan
_, df_mask, df_origin = split_data
df_with_holes = df_origin.copy()
df_with_holes[df_mask] = np.nan

subset = self.generator_holes.subset
if subset is None:
raise ValueError(
"HoleGenerator `subset` should be overwritten in split "
"but it is none!"
)

split_results = {}
for imputer_name, imputer in self.dict_imputers.items():
dict_config_opti_imputer = self.dict_config_opti.get(
imputer_name, {}
)

imputer_opti = hyperparameters.optimize(
imputer,
df,
df_origin,
self.generator_holes,
metric_optim,
self.metric_optim,
dict_config_opti_imputer,
max_evals=self.max_evals,
verbose=self.verbose,
)
df_imputed = imputer_opti.fit_transform(df_corrupted)
subset = self.generator_holes.subset
if subset is None:
raise ValueError(
"HoleGenerator `subset` should be overwritten in split "
"but it is none!"
)
df_errors = self.get_errors(

df_imputed = imputer_opti.fit_transform(df_with_holes)
errors = self.get_errors(
df_origin[subset], df_imputed[subset], df_mask[subset]
)
split_results[imputer_name] = errors

return pd.concat(split_results, axis=1)

def process_imputer(
self, imputer_data: Tuple[str, Any, List[pd.DataFrame], pd.DataFrame]
) -> Tuple[str, pd.DataFrame]:
"""Process an imputer.

Parameters
----------
imputer_data : Tuple[str, Any, List[pd.DataFrame], pd.DataFrame]
contains (imputer_name, imputer, all_masks, df_origin)

Returns
-------
Tuple[str, pd.DataFrame]
imputer name, errors results

"""
imputer_name, imputer, all_masks, df_origin = imputer_data

subset = self.generator_holes.subset
if subset is None:
raise ValueError(
"HoleGenerator `subset` should be overwritten in split "
"but it is none!"
)

dict_config_opti_imputer = self.dict_config_opti.get(imputer_name, {})
imputer_opti = hyperparameters.optimize(
imputer,
df_origin,
self.generator_holes,
self.metric_optim,
dict_config_opti_imputer,
max_evals=self.max_evals,
verbose=self.verbose,
)

imputer_results = []
for i, df_mask in enumerate(all_masks):
df_with_holes = df_origin.copy()
df_with_holes[df_mask] = np.nan
df_imputed = imputer_opti.fit_transform(df_with_holes)
errors = self.get_errors(
df_origin[subset], df_imputed[subset], df_mask[subset]
)
list_errors.append(df_errors)
df_errors = pd.DataFrame(list_errors)
errors_mean = df_errors.mean(axis=0)
imputer_results.append(errors)

return errors_mean
return imputer_name, pd.concat(imputer_results).groupby(
level=[0, 1]
).mean()

def compare(
self,
df: pd.DataFrame,
):
"""Compure different imputation methods on dataframe df.
df_origin: pd.DataFrame,
use_parallel: bool = True,
n_jobs: int = -1,
parallel_over: str = "auto",
) -> pd.DataFrame:
"""Compare different imputers in parallel with hyperparams opti.

Parameters
----------
df : pd.DataFrame
input dataframe (for comparison)
df_origin : pd.DataFrame
df with missing values
n_splits : int, optional
number of 'splits', i.e. fake dataframe with
artificial holes, by default 10
use_parallel : bool, optional
if parallelisation, by default True
n_jobs : int, optional
number of jobs to use for the parallelisation, by default -1
parallel_over : str, optional
'splits' or 'imputers', by default "auto"

Returns
-------
pd.DataFrame
Dataframe with the metrics results, imputers are in columns
and indices represent metrics and variables.
DataFrame (2-level index) with results.
Columsn are imputers.
0-level index are the metrics.
1-level index are the column names.

"""
dict_errors = {}
logging.info(
f"Starting comparison for {len(self.dict_imputers)} imputers."
)

all_splits = list(self.generator_holes.split(df_origin))

for name, imputer in self.dict_imputers.items():
dict_config_opti_imputer = self.dict_config_opti.get(name, {})
if parallel_over == "auto":
parallel_over = (
"splits"
if len(all_splits) > len(self.dict_imputers)
else "imputers"
)

try:
logging.info(f"Testing model: {name}...")
dict_errors[name] = self.evaluate_errors_sample(
imputer, df, dict_config_opti_imputer, self.metric_optim
if use_parallel:
logging.info(f"Parallelisation over: {parallel_over}...")
if parallel_over == "splits":
split_data = [
(i, df_mask, df_origin)
for i, df_mask in enumerate(all_splits)
]
n_jobs = self.get_optimal_n_jobs(split_data, n_jobs)
results = Parallel(n_jobs=n_jobs)(
delayed(self.process_split)(data) for data in split_data
)
logging.info("done.")
except Exception as excp:
logging.info(
f"Error while testing {name} of type "
f"{type(imputer).__name__}!"
final_results = pd.concat(results).groupby(level=[0, 1]).mean()
elif parallel_over == "imputers":
imputer_data = [
(name, imputer, all_splits, df_origin)
for name, imputer in self.dict_imputers.items()
]
n_jobs = self.get_optimal_n_jobs(imputer_data, n_jobs)
results = Parallel(n_jobs=n_jobs)(
delayed(self.process_imputer)(data)
for data in imputer_data
)
final_results = pd.concat(dict(results), axis=1)
else:
raise ValueError(
"`parallel_over` should be `auto`, `splits` or `imputers`."
)
raise excp

df_errors = pd.DataFrame(dict_errors)
else:
logging.info("Sequential treatment...")
if parallel_over == "splits":
split_data = [
(i, df_mask, df_origin)
for i, df_mask in enumerate(all_splits)
]
results = [self.process_split(data) for data in split_data]
final_results = pd.concat(results).groupby(level=[0, 1]).mean()
elif parallel_over == "imputers":
imputer_data = [
(name, imputer, all_splits, df_origin)
for name, imputer in self.dict_imputers.items()
]
results = [self.process_imputer(data) for data in imputer_data]
final_results = pd.concat(dict(results), axis=1)
else:
raise ValueError(
"`parallel_over` should be `auto`, `splits` or `imputers`."
)

return df_errors
logging.info("Comparison successfully terminated.")
return final_results

@staticmethod
def get_optimal_n_jobs(split_data: List, n_jobs: int = -1) -> int:
"""Determine the optimal number of parallel jobs to use.

If `n_jobs` is specified by the user, that value is used.
Otherwise, the function returns the minimum between the number of
CPU cores and the number of tasks (i.e., the length of `split_data`),
ensuring that no more jobs than tasks are launched.

Parameters
----------
split_data : List
A collection of data to be processed in parallel.
The length of this collection determines the number of tasks.
n_jobs : int
The number of jobs (parallel workers) to use, by default -1

Returns
-------
int
The optimal number of jobs to run in parallel

"""
return min(cpu_count(), len(split_data)) if n_jobs == -1 else n_jobs
Loading
Loading