From ddb503a96a38b95bbca6d64d857e9c25dd4f2751 Mon Sep 17 00:00:00 2001 From: Vlasovets Date: Mon, 11 Dec 2023 16:48:08 +0100 Subject: [PATCH] add docstring to utils.py --- q2_gglasso/utils.py | 240 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 238 insertions(+), 2 deletions(-) diff --git a/q2_gglasso/utils.py b/q2_gglasso/utils.py index 7fd7c02..0f97c7e 100644 --- a/q2_gglasso/utils.py +++ b/q2_gglasso/utils.py @@ -5,12 +5,30 @@ def flatten_array(x): + """ + Flatten a NumPy array. + + Parameters: + - x: The input array. + + Returns: + - np.ndarray: A flattened version of the input array. + """ x = np.array(x) x = x.flatten() return x def list_to_array(x=list): + """ + Convert a list to a NumPy array. + + Parameters: + - x (list): The input list. + + Returns: + - np.ndarray or scalar: A NumPy array if the list has more than one element, or a scalar if it has only one element. + """ if isinstance(x, list): x = np.array(x) @@ -20,11 +38,30 @@ def list_to_array(x=list): def numeric_to_list(x): + """ + Convert a numeric value or None to a list. + + Parameters: + - x (int, float, or None): The input value. + + Returns: + - list: A list containing the input value. If the input is already a list or None, it is returned as is. + """ if (isinstance(x, (int, float))) or (x is None): x = [x] return x def if_equal_dict(a, b): + """ + Check if the values for each key in two dictionaries are equal. + + Parameters: + - a (dict): The first dictionary. + - b (dict): The second dictionary. + + Returns: + - bool: True if the values for each key are equal, False otherwise. + """ x = True for key in a.keys(): if a[key].all() == b[key].all(): @@ -35,6 +72,16 @@ def if_equal_dict(a, b): def pep_metric(matrix: pd.DataFrame): + """ + Calculate the Positive Edge Proportion (PEP) metric for a given adjacency matrix. + The ratio of the number of positive edges to the total number of edges. + + Parameters: + - matrix (pd.DataFrame): The adjacency matrix representing interactions between nodes. + + Returns: + - float: The Positive Edge Proportion (PEP) metric, rounded to two decimal places. + """ total_edges = np.count_nonzero(matrix) / 2 positive_edges = np.sum(matrix > 0, axis=0) total_positives = np.sum(positive_edges) / 2 @@ -43,6 +90,15 @@ def pep_metric(matrix: pd.DataFrame): def if_2d_array(x=np.ndarray): + """ + Ensure input array is 2D. + + Parameters: + - x (numpy.ndarray): The input array. + + Returns: + - numpy.ndarray: The input array as a 2D array. + """ # if 3d array of shape (1,p,p), # make it 2d array of shape (p,p). if x.shape[0] == 1: @@ -51,6 +107,19 @@ def if_2d_array(x=np.ndarray): def if_all_none(lambda1, lambda2, mu1): + """ + Check if all hyperparameters (lambda1, lambda2, mu1) are None and set default values if needed. + + Parameters: + - lambda1: The value or list of values for lambda1. + - lambda2: The value or list of values for lambda2. + - mu1: The value or list of values for mu1. + + Returns: + - tuple: A tuple containing updated values for lambda1, lambda2, and mu1. + + If all hyperparameters are None, default values are set and a message is printed. + """ if lambda1 is None and lambda2 is None and mu1 is None: lambda1 = np.logspace(0, -3, 10) lambda2 = np.logspace(-1, -4, 5) @@ -65,6 +134,17 @@ def if_all_none(lambda1, lambda2, mu1): def if_model_selection(lambda1, lambda2, mu1): + """ + Check if model selection is enabled based on the provided lambda and mu values. + + Parameters: + - lambda1: The value or list of values for lambda1. + - lambda2: The value or list of values for lambda2. + - mu1: The value or list of values for mu1. + + Returns: + - bool: True if model selection is enabled (multiple values for lambda1, lambda2, or mu1), False otherwise. + """ lambda1 = numeric_to_list(lambda1) lambda2 = numeric_to_list(lambda2) mu1 = numeric_to_list(mu1) @@ -77,6 +157,19 @@ def if_model_selection(lambda1, lambda2, mu1): def get_seq_depth(counts): + """ + Calculate and scale sequencing depth from count data. + + Parameters: + - counts (numpy.ndarray or pandas.DataFrame): A 2D array or DataFrame where rows represent features and columns represent samples. + + Returns: + - numpy.ndarray: Scaled sequencing depth values. + + The sequencing depth is calculated by summing counts across features or samples based on the larger dimension. + The depth values are then scaled to the range [0, 1]. + """ + p, n = counts.shape if p >= n: depth = counts.sum(axis=0) @@ -87,6 +180,22 @@ def get_seq_depth(counts): def get_range(lower_bound, upper_bound, n): + """ + Generate a logarithmic range of values between lower_bound and upper_bound. + + Parameters: + - lower_bound (float or None): The lower bound of the range. If None, a default value of 1e-3 is used. + - upper_bound (float or None): The upper bound of the range. If None, a default value of 1 is used. + - n (int): The number of values to generate in the logarithmic range. + + Returns: + - list: A list of n logarithmically spaced values between lower_bound and upper_bound. + If both lower_bound and upper_bound are None, the list contains a single element [None]. + + Example: + get_range(0, 2, 5) + [1e-3, 0.01, 0.1, 1.0, 10.0] + """ if (lower_bound is None) and (upper_bound is None): range = [None] else: @@ -94,12 +203,34 @@ def get_range(lower_bound, upper_bound, n): lower_bound = 1e-3 if upper_bound is None: upper_bound = 1 - range = np.linspace(lower_bound, upper_bound, n) + range = np.logspace(lower_bound, upper_bound, n) return range def get_hyperparameters(lambda1_min, lambda1_max, lambda2_min, lambda2_max, mu1_min, mu1_max, n_lambda1: int = 1, n_lambda2: int = 1, n_mu1: int = 1): + """ + Generate hyperparameters for a model based on specified ranges. + + Parameters: + - lambda1_min (float): The minimum value for lambda1. + - lambda1_max (float): The maximum value for lambda1. + - lambda2_min (float): The minimum value for lambda2. + - lambda2_max (float): The maximum value for lambda2. + - mu1_min (float): The minimum value for mu1. + - mu1_max (float): The maximum value for mu1. + - n_lambda1 (int, optional): The number of values to generate for lambda1 (default is 1). + - n_lambda2 (int, optional): The number of values to generate for lambda2 (default is 1). + - n_mu1 (int, optional): The number of values to generate for mu1 (default is 1). + + Returns: + - dict: A dictionary containing model hyperparameters: + - 'model_selection' (bool): True if multiple values for any hyperparameter, False if all hyperparameters have a single value. + - 'lambda1' (float or array): The generated values for lambda1. + - 'lambda2' (float or array): The generated values for lambda2. + - 'mu1' (float or array): The generated values for mu1. + """ + lambda1 = get_range(lower_bound=lambda1_min, upper_bound=lambda1_max, n=n_lambda1) lambda2 = get_range(lower_bound=lambda2_min, upper_bound=lambda2_max, n=n_lambda2) mu1 = get_range(lower_bound=mu1_min, upper_bound=mu1_max, n=n_mu1) @@ -129,6 +260,19 @@ def get_hyperparameters(lambda1_min, lambda1_max, lambda2_min, lambda2_max, mu1_ def get_lambda_mask(adapt_lambda1: list, covariance_matrix: pd.DataFrame): + """ + Generate a lambda mask based on adaptive lambda values. + + Parameters: + - adapt_lambda1 (list): A list containing pairs of strings and corresponding lambda values to adapt. + The strings represent patterns to match in index and column labels of the covariance_matrix. + The lambda values are applied to the elements in the covariance_matrix that match the patterns. + - covariance_matrix (pd.DataFrame): The covariance matrix to which adaptive lambda values will be applied. + + Returns: + - np.ndarray: A masked version of the covariance matrix with adaptive lambda values. + """ + mask = np.ones(covariance_matrix.shape) adapt_dict = {adapt_lambda1[i]: adapt_lambda1[i + 1] for i in range(0, len(adapt_lambda1), 2)} @@ -145,6 +289,20 @@ def get_lambda_mask(adapt_lambda1: list, covariance_matrix: pd.DataFrame): def check_lambda_path(P, mgl_problem=False): + """ + Check if optimal lambda values are on the edges of their respective intervals. + + Parameters: + - P: The problem instance containing model selection parameters and statistics. + - mgl_problem (bool, optional): Indicates whether the problem is a multi-group lasso problem (default is False). + + Returns: + - bool: True if the optimal lambda values are on the edges of their intervals, False otherwise. + + Warnings: + - Issues warnings if the optimal lambda values are on the edge of their intervals. + + """ sol_par = P.__dict__["modelselect_params"] lambda1_opt = P.modelselect_stats["BEST"]["lambda1"] lambda1_min = sol_par["lambda1_range"].min() @@ -216,6 +374,16 @@ def log_transform(X, transformation=str, eps=0.1): def zero_imputation(df: pd.DataFrame, pseudo_count: int = 1): + """ + Perform zero imputation on a DataFrame by adding a pseudo count to zero values and scaling. + + Parameters: + - df (pd.DataFrame): The input DataFrame with potentially zero values. + - pseudo_count (int, optional): The pseudo count added to zero values (default is 1). + + Returns: + - pd.DataFrame: The DataFrame after zero imputation. + """ X = df.copy() original_sum = X.sum(axis=0) # sum in a sample (axis=0 if p, N matrix) for col in X.columns: @@ -228,6 +396,12 @@ def zero_imputation(df: pd.DataFrame, pseudo_count: int = 1): def remove_biom_header(file_path): + """ + Remove the header line from a BIOM file. + + Parameters: + - file_path (str): The path to the BIOM file. + """ with open(str(file_path), 'r') as fin: data = fin.read().splitlines(True) with open(str(file_path), 'w') as fout: @@ -235,6 +409,15 @@ def remove_biom_header(file_path): def calculate_seq_depth(data=pd.DataFrame): + """ + Calculate and scale sequencing depth from count data. + + Parameters: + - data (pd.DataFrame): A DataFrame where rows represent samples and columns represent features. + + Returns: + - pd.DataFrame: A DataFrame containing scaled sequencing depth values. + """ x = data.sum(axis=1) x_scaled = (x - x.min()) / (x.max() - x.min()) seq_depth = pd.DataFrame(data=x_scaled, columns=["sequencing depth"]) @@ -242,6 +425,18 @@ def calculate_seq_depth(data=pd.DataFrame): def single_hyperparameters(model_selection, lambda1, lambda2=None, mu1=None): + """ + Convert hyperparameters to single values if model selection is not enabled. + + Parameters: + - model_selection (bool): Indicates whether model selection is enabled. + - lambda1: The value or list of values for lambda1. + - lambda2: The value or list of values for lambda2 (default is None). + - mu1: The value or list of values for mu1 (default is None). + + Returns: + - tuple: A tuple containing single values for lambda1, lambda2, and mu1 if model selection is not enabled. + """ if model_selection is False: lambda1 = np.array(lambda1).item() lambda2 = np.array(lambda2).item() @@ -251,7 +446,13 @@ def single_hyperparameters(model_selection, lambda1, lambda2=None, mu1=None): def to_zarr(obj, name, root, first=True): """ - Function for converting a GGLasso object to a zarr file, a with tree structue. + Convert a GGLasso object to a zarr file with a tree structure. + + Parameters: + - obj: The GGLasso object or dictionary to be converted. + - name (str): The name to use for the current level in the zarr hierarchy. + - root (zarr.Group): The root group to create the zarr hierarchy. + - first (bool, optional): Indicates whether it is the first level (default is True). """ # name 'S' is dedicated for some internal usage in zarr notation and cannot be accessed as a key while reading if name == "S": @@ -285,6 +486,20 @@ def to_zarr(obj, name, root, first=True): def PCA(X, L, inverse=True): + """ + Perform Principal Component Analysis (PCA). + + Parameters: + - X (pd.DataFrame or np.ndarray): The input data. + - L (np.ndarray): The Laplacian matrix used in PCA. + - inverse (bool, optional): If True, perform inverse PCA (default is True). + + Returns: + - tuple: A tuple containing the PCA results: + - np.ndarray: The projected data. + - np.ndarray: The loadings matrix. + - np.ndarray: The eigenvalues. + """ sig, V = np.linalg.eigh(L) # sort eigenvalues in descending order @@ -306,6 +521,27 @@ def PCA(X, L, inverse=True): def correlated_PC(data=pd.DataFrame, metadata=pd.DataFrame, low_rank=np.ndarray, corr_bound=float, alpha: float = 0.05): + """ + Identify and analyze correlated principal components based on Spearman correlation. + + Parameters: + - data (pd.DataFrame): The input data with features. + - metadata (pd.DataFrame): The metadata used for correlation analysis. + - low_rank (np.ndarray): The low-rank matrix for Principal Component Analysis (PCA). + - corr_bound (float): The absolute correlation threshold for considering a correlation as significant. + - alpha (float, optional): The significance level for hypothesis testing (default is 0.05). + + Returns: + - dict: A dictionary containing information about correlated principal components for each metadata column: + - key (str): The metadata column name. + - value (dict): Sub-dictionary with information about correlated principal components: + - "PC j" (str): The j-th principal component. + - "data" (pd.DataFrame): The data used for analysis. + - "eigenvalue" (float): The eigenvalue of the principal component. + - "rho" (float): Spearman correlation coefficient. + - "p_value" (float): P-value for the correlation test. + """ + proj_dict = dict() seq_depth = calculate_seq_depth(data) r = np.linalg.matrix_rank(low_rank)