Skip to content

Commit

Permalink
add docstring to utils.py
Browse files Browse the repository at this point in the history
  • Loading branch information
Vlasovets committed Dec 11, 2023
1 parent 16a41f3 commit ddb503a
Showing 1 changed file with 238 additions and 2 deletions.
240 changes: 238 additions & 2 deletions q2_gglasso/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,30 @@


def flatten_array(x):
"""
Flatten a NumPy array.
Parameters:
- x: The input array.
Returns:
- np.ndarray: A flattened version of the input array.
"""
x = np.array(x)
x = x.flatten()
return x


def list_to_array(x=list):
"""
Convert a list to a NumPy array.
Parameters:
- x (list): The input list.
Returns:
- np.ndarray or scalar: A NumPy array if the list has more than one element, or a scalar if it has only one element.
"""
if isinstance(x, list):
x = np.array(x)

Expand All @@ -20,11 +38,30 @@ def list_to_array(x=list):


def numeric_to_list(x):
"""
Convert a numeric value or None to a list.
Parameters:
- x (int, float, or None): The input value.
Returns:
- list: A list containing the input value. If the input is already a list or None, it is returned as is.
"""
if (isinstance(x, (int, float))) or (x is None):
x = [x]
return x

def if_equal_dict(a, b):
"""
Check if the values for each key in two dictionaries are equal.
Parameters:
- a (dict): The first dictionary.
- b (dict): The second dictionary.
Returns:
- bool: True if the values for each key are equal, False otherwise.
"""
x = True
for key in a.keys():
if a[key].all() == b[key].all():
Expand All @@ -35,6 +72,16 @@ def if_equal_dict(a, b):


def pep_metric(matrix: pd.DataFrame):
"""
Calculate the Positive Edge Proportion (PEP) metric for a given adjacency matrix.
The ratio of the number of positive edges to the total number of edges.
Parameters:
- matrix (pd.DataFrame): The adjacency matrix representing interactions between nodes.
Returns:
- float: The Positive Edge Proportion (PEP) metric, rounded to two decimal places.
"""
total_edges = np.count_nonzero(matrix) / 2
positive_edges = np.sum(matrix > 0, axis=0)
total_positives = np.sum(positive_edges) / 2
Expand All @@ -43,6 +90,15 @@ def pep_metric(matrix: pd.DataFrame):


def if_2d_array(x=np.ndarray):
"""
Ensure input array is 2D.
Parameters:
- x (numpy.ndarray): The input array.
Returns:
- numpy.ndarray: The input array as a 2D array.
"""
# if 3d array of shape (1,p,p),
# make it 2d array of shape (p,p).
if x.shape[0] == 1:
Expand All @@ -51,6 +107,19 @@ def if_2d_array(x=np.ndarray):


def if_all_none(lambda1, lambda2, mu1):
"""
Check if all hyperparameters (lambda1, lambda2, mu1) are None and set default values if needed.
Parameters:
- lambda1: The value or list of values for lambda1.
- lambda2: The value or list of values for lambda2.
- mu1: The value or list of values for mu1.
Returns:
- tuple: A tuple containing updated values for lambda1, lambda2, and mu1.
If all hyperparameters are None, default values are set and a message is printed.
"""
if lambda1 is None and lambda2 is None and mu1 is None:
lambda1 = np.logspace(0, -3, 10)
lambda2 = np.logspace(-1, -4, 5)
Expand All @@ -65,6 +134,17 @@ def if_all_none(lambda1, lambda2, mu1):


def if_model_selection(lambda1, lambda2, mu1):
"""
Check if model selection is enabled based on the provided lambda and mu values.
Parameters:
- lambda1: The value or list of values for lambda1.
- lambda2: The value or list of values for lambda2.
- mu1: The value or list of values for mu1.
Returns:
- bool: True if model selection is enabled (multiple values for lambda1, lambda2, or mu1), False otherwise.
"""
lambda1 = numeric_to_list(lambda1)
lambda2 = numeric_to_list(lambda2)
mu1 = numeric_to_list(mu1)
Expand All @@ -77,6 +157,19 @@ def if_model_selection(lambda1, lambda2, mu1):


def get_seq_depth(counts):
"""
Calculate and scale sequencing depth from count data.
Parameters:
- counts (numpy.ndarray or pandas.DataFrame): A 2D array or DataFrame where rows represent features and columns represent samples.
Returns:
- numpy.ndarray: Scaled sequencing depth values.
The sequencing depth is calculated by summing counts across features or samples based on the larger dimension.
The depth values are then scaled to the range [0, 1].
"""

p, n = counts.shape
if p >= n:
depth = counts.sum(axis=0)
Expand All @@ -87,19 +180,57 @@ def get_seq_depth(counts):


def get_range(lower_bound, upper_bound, n):
"""
Generate a logarithmic range of values between lower_bound and upper_bound.
Parameters:
- lower_bound (float or None): The lower bound of the range. If None, a default value of 1e-3 is used.
- upper_bound (float or None): The upper bound of the range. If None, a default value of 1 is used.
- n (int): The number of values to generate in the logarithmic range.
Returns:
- list: A list of n logarithmically spaced values between lower_bound and upper_bound.
If both lower_bound and upper_bound are None, the list contains a single element [None].
Example:
get_range(0, 2, 5)
[1e-3, 0.01, 0.1, 1.0, 10.0]
"""
if (lower_bound is None) and (upper_bound is None):
range = [None]
else:
if lower_bound is None:
lower_bound = 1e-3
if upper_bound is None:
upper_bound = 1
range = np.linspace(lower_bound, upper_bound, n)
range = np.logspace(lower_bound, upper_bound, n)
return range


def get_hyperparameters(lambda1_min, lambda1_max, lambda2_min, lambda2_max, mu1_min, mu1_max,
n_lambda1: int = 1, n_lambda2: int = 1, n_mu1: int = 1):
"""
Generate hyperparameters for a model based on specified ranges.
Parameters:
- lambda1_min (float): The minimum value for lambda1.
- lambda1_max (float): The maximum value for lambda1.
- lambda2_min (float): The minimum value for lambda2.
- lambda2_max (float): The maximum value for lambda2.
- mu1_min (float): The minimum value for mu1.
- mu1_max (float): The maximum value for mu1.
- n_lambda1 (int, optional): The number of values to generate for lambda1 (default is 1).
- n_lambda2 (int, optional): The number of values to generate for lambda2 (default is 1).
- n_mu1 (int, optional): The number of values to generate for mu1 (default is 1).
Returns:
- dict: A dictionary containing model hyperparameters:
- 'model_selection' (bool): True if multiple values for any hyperparameter, False if all hyperparameters have a single value.
- 'lambda1' (float or array): The generated values for lambda1.
- 'lambda2' (float or array): The generated values for lambda2.
- 'mu1' (float or array): The generated values for mu1.
"""

lambda1 = get_range(lower_bound=lambda1_min, upper_bound=lambda1_max, n=n_lambda1)
lambda2 = get_range(lower_bound=lambda2_min, upper_bound=lambda2_max, n=n_lambda2)
mu1 = get_range(lower_bound=mu1_min, upper_bound=mu1_max, n=n_mu1)
Expand Down Expand Up @@ -129,6 +260,19 @@ def get_hyperparameters(lambda1_min, lambda1_max, lambda2_min, lambda2_max, mu1_


def get_lambda_mask(adapt_lambda1: list, covariance_matrix: pd.DataFrame):
"""
Generate a lambda mask based on adaptive lambda values.
Parameters:
- adapt_lambda1 (list): A list containing pairs of strings and corresponding lambda values to adapt.
The strings represent patterns to match in index and column labels of the covariance_matrix.
The lambda values are applied to the elements in the covariance_matrix that match the patterns.
- covariance_matrix (pd.DataFrame): The covariance matrix to which adaptive lambda values will be applied.
Returns:
- np.ndarray: A masked version of the covariance matrix with adaptive lambda values.
"""

mask = np.ones(covariance_matrix.shape)
adapt_dict = {adapt_lambda1[i]: adapt_lambda1[i + 1] for i in range(0, len(adapt_lambda1), 2)}

Expand All @@ -145,6 +289,20 @@ def get_lambda_mask(adapt_lambda1: list, covariance_matrix: pd.DataFrame):


def check_lambda_path(P, mgl_problem=False):
"""
Check if optimal lambda values are on the edges of their respective intervals.
Parameters:
- P: The problem instance containing model selection parameters and statistics.
- mgl_problem (bool, optional): Indicates whether the problem is a multi-group lasso problem (default is False).
Returns:
- bool: True if the optimal lambda values are on the edges of their intervals, False otherwise.
Warnings:
- Issues warnings if the optimal lambda values are on the edge of their intervals.
"""
sol_par = P.__dict__["modelselect_params"]
lambda1_opt = P.modelselect_stats["BEST"]["lambda1"]
lambda1_min = sol_par["lambda1_range"].min()
Expand Down Expand Up @@ -216,6 +374,16 @@ def log_transform(X, transformation=str, eps=0.1):


def zero_imputation(df: pd.DataFrame, pseudo_count: int = 1):
"""
Perform zero imputation on a DataFrame by adding a pseudo count to zero values and scaling.
Parameters:
- df (pd.DataFrame): The input DataFrame with potentially zero values.
- pseudo_count (int, optional): The pseudo count added to zero values (default is 1).
Returns:
- pd.DataFrame: The DataFrame after zero imputation.
"""
X = df.copy()
original_sum = X.sum(axis=0) # sum in a sample (axis=0 if p, N matrix)
for col in X.columns:
Expand All @@ -228,20 +396,47 @@ def zero_imputation(df: pd.DataFrame, pseudo_count: int = 1):


def remove_biom_header(file_path):
"""
Remove the header line from a BIOM file.
Parameters:
- file_path (str): The path to the BIOM file.
"""
with open(str(file_path), 'r') as fin:
data = fin.read().splitlines(True)
with open(str(file_path), 'w') as fout:
fout.writelines(data[1:])


def calculate_seq_depth(data=pd.DataFrame):
"""
Calculate and scale sequencing depth from count data.
Parameters:
- data (pd.DataFrame): A DataFrame where rows represent samples and columns represent features.
Returns:
- pd.DataFrame: A DataFrame containing scaled sequencing depth values.
"""
x = data.sum(axis=1)
x_scaled = (x - x.min()) / (x.max() - x.min())
seq_depth = pd.DataFrame(data=x_scaled, columns=["sequencing depth"])
return seq_depth


def single_hyperparameters(model_selection, lambda1, lambda2=None, mu1=None):
"""
Convert hyperparameters to single values if model selection is not enabled.
Parameters:
- model_selection (bool): Indicates whether model selection is enabled.
- lambda1: The value or list of values for lambda1.
- lambda2: The value or list of values for lambda2 (default is None).
- mu1: The value or list of values for mu1 (default is None).
Returns:
- tuple: A tuple containing single values for lambda1, lambda2, and mu1 if model selection is not enabled.
"""
if model_selection is False:
lambda1 = np.array(lambda1).item()
lambda2 = np.array(lambda2).item()
Expand All @@ -251,7 +446,13 @@ def single_hyperparameters(model_selection, lambda1, lambda2=None, mu1=None):

def to_zarr(obj, name, root, first=True):
"""
Function for converting a GGLasso object to a zarr file, a with tree structue.
Convert a GGLasso object to a zarr file with a tree structure.
Parameters:
- obj: The GGLasso object or dictionary to be converted.
- name (str): The name to use for the current level in the zarr hierarchy.
- root (zarr.Group): The root group to create the zarr hierarchy.
- first (bool, optional): Indicates whether it is the first level (default is True).
"""
# name 'S' is dedicated for some internal usage in zarr notation and cannot be accessed as a key while reading
if name == "S":
Expand Down Expand Up @@ -285,6 +486,20 @@ def to_zarr(obj, name, root, first=True):


def PCA(X, L, inverse=True):
"""
Perform Principal Component Analysis (PCA).
Parameters:
- X (pd.DataFrame or np.ndarray): The input data.
- L (np.ndarray): The Laplacian matrix used in PCA.
- inverse (bool, optional): If True, perform inverse PCA (default is True).
Returns:
- tuple: A tuple containing the PCA results:
- np.ndarray: The projected data.
- np.ndarray: The loadings matrix.
- np.ndarray: The eigenvalues.
"""
sig, V = np.linalg.eigh(L)

# sort eigenvalues in descending order
Expand All @@ -306,6 +521,27 @@ def PCA(X, L, inverse=True):

def correlated_PC(data=pd.DataFrame, metadata=pd.DataFrame, low_rank=np.ndarray,
corr_bound=float, alpha: float = 0.05):
"""
Identify and analyze correlated principal components based on Spearman correlation.
Parameters:
- data (pd.DataFrame): The input data with features.
- metadata (pd.DataFrame): The metadata used for correlation analysis.
- low_rank (np.ndarray): The low-rank matrix for Principal Component Analysis (PCA).
- corr_bound (float): The absolute correlation threshold for considering a correlation as significant.
- alpha (float, optional): The significance level for hypothesis testing (default is 0.05).
Returns:
- dict: A dictionary containing information about correlated principal components for each metadata column:
- key (str): The metadata column name.
- value (dict): Sub-dictionary with information about correlated principal components:
- "PC j" (str): The j-th principal component.
- "data" (pd.DataFrame): The data used for analysis.
- "eigenvalue" (float): The eigenvalue of the principal component.
- "rho" (float): Spearman correlation coefficient.
- "p_value" (float): P-value for the correlation test.
"""

proj_dict = dict()
seq_depth = calculate_seq_depth(data)
r = np.linalg.matrix_rank(low_rank)
Expand Down

0 comments on commit ddb503a

Please sign in to comment.