Skip to content

Commit

Permalink
🚀 Ends the test implementation and associated tests.
Browse files Browse the repository at this point in the history
Sill have problems with random_state usage.
  • Loading branch information
adriencrtrcap committed Aug 7, 2024
1 parent 8730f96 commit 33b95ce
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 31 deletions.
203 changes: 177 additions & 26 deletions qolmat/analysis/holes_characterization.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from abc import ABC, abstractmethod
from typing import Optional, Tuple, Union
from typing import List, Optional, Tuple, Union

from joblib import Parallel, delayed
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn import utils as sku
from scipy.stats import chi2

from qolmat.utils.exceptions import TooManyMissingPatterns
Expand All @@ -13,7 +15,14 @@
class McarTest(ABC):
"""
Astract class for MCAR tests.
Parameters
----------
random_state : int, optional
The seed of the pseudo random number generator to use, for reproductibility.
"""
def __init__(self, random_state: Union[None, int, np.random.RandomState] = None):
self.rng = sku.check_random_state(random_state)

@abstractmethod
def test(self, df: pd.DataFrame) -> float:
Expand All @@ -29,7 +38,7 @@ class LittleTest(McarTest):
References
----------
Little. "A Test of Missing Completely at Random for Multivariate Data with Missing Values."
Journal of the American Statistical Association, Volume 83, 1988 - Issue 404
Journal of the American Statistical Association, Volume 83, 1988 - no 404, p. 1198-1202
Parameters
----------
Expand All @@ -46,13 +55,12 @@ def __init__(
imputer: Optional[ImputerEM] = None,
random_state: Union[None, int, np.random.RandomState] = None,
):
super().__init__()
super().__init__(random_state=random_state)
if imputer and imputer.model != "multinormal":
raise AttributeError(
"The ImputerEM model must be 'multinormal' to use the Little's test"
)
self.imputer = imputer
self.random_state = random_state

def test(self, df: pd.DataFrame) -> float:
"""
Expand All @@ -69,7 +77,7 @@ def test(self, df: pd.DataFrame) -> float:
float
The p-value of the test.
"""
imputer = self.imputer or ImputerEM(random_state=self.random_state)
imputer = self.imputer or ImputerEM(random_state=self.rng)
imputer = imputer._fit_element(df)

d0 = 0
Expand Down Expand Up @@ -98,9 +106,18 @@ def test(self, df: pd.DataFrame) -> float:

class PKLMTest(McarTest):
"""
PKLMTest extends McarTest for testing purposes.
Attributes:
This class implements the PKLM test, a fully non-parametric, easy-to-use, and powerful test
for the missing completely at random (MCAR) assumption on the missingness mechanism of a
dataset. The null hypothesis is "The missing data mechanism is MCAR".
This test is applicable to mixed data (quantitative and categoricals features).
References
----------
Spohn, M. L., Näf, J., Michel, L., & Meinshausen, N. (2021). PKLM: A flexible MCAR test using
Classification. arXiv preprint arXiv:2109.10150.
Parameters
-----------
nb_projections : int
Number of projections.
Expand All @@ -123,17 +140,11 @@ def __init__(
exact_p_value: bool = False,
random_state: Union[None, int, np.random.RandomState] = None,
):
super().__init__()
super().__init__(random_state=random_state)
self.nb_projections = nb_projections
self.nb_permutation = nb_permutation
self.nb_trees_per_proj = nb_trees_per_proj
self.exact_p_value = exact_p_value
self.random_state = (
np.random.default_rng(random_state) if isinstance(
random_state,
(type(None), int)
) else random_state
)

@staticmethod
def _check_nb_patterns(df: np.ndarray) -> None:
Expand All @@ -144,10 +155,13 @@ def _check_nb_patterns(df: np.ndarray) -> None:
This condition comes from the PKLM paper, please see the reference if needed.
Parameters:
df (np.ndarray): 2D array with NaNs as missing values.
-----------
df : np.ndarray
2D array with NaNs as missing values.
Raises:
TooManyMissingPatterns: If unique missing patterns exceed the number of rows.
-------
TooManyMissingPatterns: If unique missing patterns exceed the number of rows.
"""
n_rows, _ = df.shape
indicator_matrix = ~np.isnan(df)
Expand All @@ -156,8 +170,7 @@ def _check_nb_patterns(df: np.ndarray) -> None:
if nb_patterns > n_rows:
raise TooManyMissingPatterns()

@staticmethod
def _draw_features_and_target_indexes(df: np.ndarray) -> Tuple[np.ndarray, int]:
def _draw_features_and_target_indexes(self, df: np.ndarray) -> Tuple[np.ndarray, int]:
"""
Randomly selects features and a target from the dataframe.
Expand All @@ -172,13 +185,13 @@ def _draw_features_and_target_indexes(df: np.ndarray) -> Tuple[np.ndarray, int]:
Indices of selected features and the target.
"""
_, p = df.shape
nb_features = np.random.randint(1, p)
features_idx = np.random.choice(range(p), size=nb_features, replace=False)
target_idx = np.random.choice(np.setdiff1d(np.arange(p), features_idx))
nb_features = self.rng.randint(1, p)
features_idx = self.rng.choice(range(p), size=nb_features, replace=False)
target_idx = self.rng.choice(np.setdiff1d(np.arange(p), features_idx))
return features_idx, target_idx

@staticmethod
def check_draw(df: np.ndarray, features_idx: np.ndarray, target_idx: int) -> np.bool_:
def _check_draw(df: np.ndarray, features_idx: np.ndarray, target_idx: int) -> np.bool_:
"""
Checks if the drawn features and target are valid.
# TODO : Need to develop.
Expand All @@ -202,7 +215,7 @@ def check_draw(df: np.ndarray, features_idx: np.ndarray, target_idx: int) -> np.
is_distinct_values = (~np.isnan(target_values)).any()
return is_nan and is_distinct_values

def draw_projection(self, df: np.ndarray) -> Tuple[np.ndarray, int]:
def _draw_projection(self, df: np.ndarray) -> Tuple[np.ndarray, int]:
"""
Draws a valid projection of features and a target.
Expand All @@ -219,7 +232,7 @@ def draw_projection(self, df: np.ndarray) -> Tuple[np.ndarray, int]:
is_checked = False
while not is_checked:
features_idx, target_idx = self._draw_features_and_target_indexes(df)
is_checked = self.check_draw(df, features_idx, target_idx)
is_checked = self._check_draw(df, features_idx, target_idx)
return features_idx, target_idx

@staticmethod
Expand All @@ -228,6 +241,25 @@ def _build_dataset(
features_idx: np.ndarray,
target_idx: int
) -> Tuple[np.ndarray, np.ndarray]:
"""
Builds a dataset by selecting specified features and target from a NumPy array,
excluding rows with NaN values in the feature columns.
Parameters:
-----------
df: np.ndarray
Input data array.
features_idx: np.ndarray
Indices of the feature columns.
target_idx: int
Index of the target column.
Returns:
--------
Tuple[np.ndarray, np.ndarray]: A tuple containing:
- X (np.ndarray): Array of selected features.
- y (np.ndarray): Binary array indicating presence of NaN (1) in the target column.
"""
X = df[~np.isnan(df[:, features_idx]).any(axis=1)][:, features_idx]
y = np.where(np.isnan(df[~np.isnan(df[:, features_idx]).any(axis=1)][:, target_idx]), 1, 0)
return X, y
Expand All @@ -239,22 +271,71 @@ def _build_label(
features_idx: np.ndarray,
target_idx: int
) -> np.ndarray:
"""
Builds a label array by selecting target values from a permutation array,
excluding rows with NaN values in the specified feature columns.
Parameters:
-----------
df: np.ndarray
Input data array.
perm: np.ndarray
Permutation array from which labels are selected.
features_idx: np.ndarray
Indices of the feature columns.
target_idx: int
Index of the target column in the permutation array.
Returns:
--------
np.ndarray: Binary array indicating presence of NaN (1) in the target column.
"""
return perm[~np.isnan(df[:, features_idx]).any(axis=1), target_idx]


def _get_oob_probabilities(self, X: np.ndarray, y: np.ndarray) -> np.ndarray:
"""
Trains a RandomForestClassifier and retrieves out-of-bag (OOB) probabilities.
Parameters:
-----------
X: np.ndarray
Feature array for training.
y: np.ndarray
Target array for training.
Returns:
--------
np.ndarray: Out-of-bag probabilities for each class.
"""
clf = RandomForestClassifier(
n_estimators=self.nb_trees_per_proj,
#max_features=None,
min_samples_split=10,
bootstrap=True,
oob_score=True,
random_state=self.rng
)
clf.fit(X, y)
return clf.oob_decision_function_

@staticmethod
def _U_hat(oob_probabilities: np.ndarray, labels: np.ndarray) -> float:
"""
Computes the U_hat statistic, a measure of classifier performance, using
out-of-bag probabilities and true labels.
Parameters:
-----------
oob_probabilities: np.ndarray
Out-of-bag probabilities for each class.
labels: np.ndarray
True labels for the data.
Returns:
--------
float: The computed U_hat statistic.
"""
oob_probabilities = np.clip(oob_probabilities, 1e-9, 1-1e-9)

unique_labels = np.unique(labels)
Expand All @@ -281,5 +362,75 @@ def _U_hat(oob_probabilities: np.ndarray, labels: np.ndarray) -> float:

return u_0 + u_1

def test(self, df: np.ndarray):
def _parallel_process_permutation(
self,
df: np.ndarray,
M_perm: np.ndarray,
features_idx: np.ndarray,
target_idx: int,
oob_probabilities: np.ndarray
) -> float:
y = self._build_label(df, M_perm, features_idx, target_idx)
return self._U_hat(oob_probabilities, y)

def _parallel_process_projection(
self,
df: np.ndarray,
list_permutations: List[np.ndarray],
features_idx: np.ndarray,
target_idx: int,
) -> Tuple[float, List[float]]:
X, y = self._build_dataset(df, features_idx, target_idx)
oob_probabilities = self._get_oob_probabilities(X, y)
u_hat = self._U_hat(oob_probabilities, y)
result_u_permutations = Parallel(n_jobs=-1)(delayed(self._parallel_process_permutation)(
df,
M_perm,
features_idx,
target_idx,
oob_probabilities
) for M_perm in list_permutations)
return u_hat, result_u_permutations

def test(self, df: np.ndarray) -> float:
"""
Apply the PKLM test over a real dataset.
Parameters
----------
df : np.ndarray
The input dataset with missing values.
Returns
-------
float
The p-value of the test.
"""
self._check_nb_patterns(df)

M = np.isnan(df).astype(int)
list_proj = [self._draw_projection(df) for _ in range(self.nb_projections)]
list_perm = [self.rng.permutation(M) for _ in range(self.nb_permutation)]
U = 0.
list_U_sigma = [0. for _ in range(self.nb_permutation)]

parallel_results = Parallel(n_jobs=-1)(delayed(self._parallel_process_projection)(
df,
list_perm,
features_idx,
target_idx
) for features_idx, target_idx in list_proj)

for U_projection, results in parallel_results:
U += U_projection
list_U_sigma = [x + y for x, y in zip(list_U_sigma, results)]

U = U / self.nb_projections
list_U_sigma = [x / self.nb_permutation for x in list_U_sigma]

p_value = 1
for u_sigma in list_U_sigma:
if u_sigma >= U:
p_value += 1
return p_value / (self.nb_permutation + 1)
37 changes: 32 additions & 5 deletions tests/analysis/test_holes_characterization.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,13 @@ def missingness_matrix_mcar_perm(missingness_matrix_mcar):
return rng.permutation(missingness_matrix_mcar)


@pytest.fixture
def oob_probabilities() -> np.ndarray:
return np.matrix([[0.5, 0.5], [0, 1], [1, 0], [1, 0]]).A


def test__draw_features_and_target_indexes(np_matrix_with_nan_mcar):
mcar_test_pklm = PKLMTest()
mcar_test_pklm = PKLMTest(42)
_, p = np_matrix_with_nan_mcar.shape
features_idx, target_idx = mcar_test_pklm._draw_features_and_target_indexes(np_matrix_with_nan_mcar)
assert isinstance(target_idx, np.integer)
Expand All @@ -108,10 +113,10 @@ def test__draw_features_and_target_indexes(np_matrix_with_nan_mcar):
("np_matrix_with_nan_mcar", np.array([1, 0, 2]), 3, False)
]
)
def test_check_draw(request, dataframe_fixture, features_idx, target_idx, expected):
def test__check_draw(request, dataframe_fixture, features_idx, target_idx, expected):
dataframe = request.getfixturevalue(dataframe_fixture)
mcar_test_pklm = PKLMTest()
result = mcar_test_pklm.check_draw(dataframe, features_idx, target_idx)
result = mcar_test_pklm._check_draw(dataframe, features_idx, target_idx)
assert result == expected


Expand Down Expand Up @@ -153,5 +158,27 @@ def test__build_label(
assert np.isin(label, [0, 1]).all()


def test__U_hat():
assert False
@pytest.mark.parametrize(
"oob_fixture, label",
[
("oob_probabilities", np.array([1, 1, 1, 1])),
("oob_probabilities", np.array([0, 0, 0, 0])),
]
)
def test__U_hat_unique_label(request, oob_fixture, label):
oob_prob = request.getfixturevalue(oob_fixture)
mcar_test_pklm = PKLMTest()
mcar_test_pklm._U_hat(oob_prob, label)


@pytest.mark.parametrize(
"oob_fixture, label, expected",
[
("oob_probabilities", np.array([1, 0, 0, 0]), 2/3*(np.log(1 - 1e-9) - np.log(1e-9))),
]
)
def test__U_hat_computation(request, oob_fixture, label, expected):
oob_prob = request.getfixturevalue(oob_fixture)
mcar_test_pklm = PKLMTest()
u_hat = mcar_test_pklm._U_hat(oob_prob, label)
assert round(u_hat, 2) == round(expected, 2)

0 comments on commit 33b95ce

Please sign in to comment.