diff --git a/pyproject.toml b/pyproject.toml index ab44e40..aa754bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "regmod" -version = "0.2.1" +version = "0.2.2" description = "General regression models" readme = "README.rst" requires-python = ">=3.10" @@ -18,8 +18,8 @@ dependencies = [ "pandas", "xspline==0.0.7", "msca", - "jax[cpu]==0.4.5", - "jaxlib==0.4.4", + "jax", + "jaxlib", ] [project.optional-dependencies] diff --git a/src/regmod/_typing.py b/src/regmod/_typing.py new file mode 100644 index 0000000..f7ef3c8 --- /dev/null +++ b/src/regmod/_typing.py @@ -0,0 +1,7 @@ +from collections.abc import Iterable +from typing import Any, Callable + +from jax import Array as JaxArray +from msca.linalg.matrix import Matrix +from numpy.typing import ArrayLike, NDArray +from pandas import DataFrame diff --git a/src/regmod/composite_models/base.py b/src/regmod/composite_models/base.py index 2b1f476..ddcdca3 100644 --- a/src/regmod/composite_models/base.py +++ b/src/regmod/composite_models/base.py @@ -1,6 +1,7 @@ """ Base Model """ + import logging from copy import deepcopy from typing import Dict, List, Optional @@ -19,15 +20,9 @@ logger = logging.getLogger(__name__) link_funs = { - "gaussian": fun_dict[ - GaussianModel.default_param_specs["mu"]["inv_link"] - ].inv_fun, - "poisson": fun_dict[ - PoissonModel.default_param_specs["lam"]["inv_link"] - ].inv_fun, - "binomial": fun_dict[ - BinomialModel.default_param_specs["p"]["inv_link"] - ].inv_fun, + "gaussian": fun_dict[GaussianModel.default_param_specs["mu"]["inv_link"]].inv_fun, + "poisson": fun_dict[PoissonModel.default_param_specs["lam"]["inv_link"]].inv_fun, + "binomial": fun_dict[BinomialModel.default_param_specs["p"]["inv_link"]].inv_fun, } model_constructors = { @@ -86,19 +81,23 @@ class BaseModel(NodeModel): Overwrite the append function in NodeModel. """ - def __init__(self, - name: str, - y: str, - variables: List[Variable], - df: Optional[pd.DataFrame] = None, - weights: str = "weights", - mtype: str = "gaussian", - prior_mask: Optional[Dict] = None, - **param_specs): + def __init__( + self, + name: str, + y: str, + variables: List[Variable], + df: Optional[DataFrame] = None, + weights: str = "weights", + mtype: str = "gaussian", + prior_mask: Optional[Dict] = None, + **param_specs, + ): super().__init__(name) - if any(mtype not in model_config - for model_config in (link_funs, model_constructors)): + if any( + mtype not in model_config + for model_config in (link_funs, model_constructors) + ): raise ValueError(f"Not supported model type {mtype}") data = deepcopy(data) variables = list(deepcopy(variables)) @@ -108,9 +107,7 @@ def __init__(self, self.df = df self.weights = weights self.variables = {v.name: v for v in variables} - self.param_specs = {"variables": variables, - "use_offset": True, - **param_specs} + self.param_specs = {"variables": variables, "use_offset": True, **param_specs} self.model = None self.prior_mask = {} if prior_mask is None else prior_mask @@ -149,10 +146,7 @@ def fit(self, **fit_options): if self.model is None: model_constructor = model_constructors[self.mtype] self.model = model_constructor( - self.y, - df=self.df, - weights=self.weights, - param_specs=self.param_specs + self.y, df=self.df, weights=self.weights, param_specs=self.param_specs ) self.model.fit(**fit_options) message = f"fit_node;finish;{self.level};{self.name};" @@ -179,17 +173,19 @@ def get_draws(self, df: DataFrame = None, size: int = 1000) -> DataFrame: pred_data = self.model.df.copy() pred_data.attach_df(df) - coefs_draws = np.random.multivariate_normal(self.model.opt_coefs, - self.model.opt_vcov, - size=size) - draws = np.vstack([ - self.model.params[0].get_param(coefs_draw, pred_data) - for coefs_draw in coefs_draws - ]) - df_draws = pd.DataFrame( + coefs_draws = np.random.multivariate_normal( + self.model.opt_coefs, self.model.opt_vcov, size=size + ) + draws = np.vstack( + [ + self.model.params[0].get_param(coefs_draw, pred_data) + for coefs_draw in coefs_draws + ] + ) + df_draws = DataFrame( draws.T, columns=[f"{self.col_value}_{i}" for i in range(size)], - index=df.index + index=df.index, ) return pd.concat([df, df_draws], axis=1) @@ -216,8 +212,7 @@ def get_posterior(self) -> Dict: # use minimum standard deviation of the posterior distribution sd = np.maximum(0.1, np.sqrt(np.diag(self.model.opt_vcov))) vnames = [v.name for v in self.param_specs["variables"]] - slices = sizes_to_slices([self.variables[name].size - for name in vnames]) + slices = sizes_to_slices([self.variables[name].size for name in vnames]) return { name: GaussianPrior(mean=mean[slices[i]], sd=sd[slices[i]]) for i, name in enumerate(vnames) @@ -240,8 +235,7 @@ def append(self, node: NodeModel, rank: int = 0): primary children. """ if rank >= 1: - raise ValueError(f"{type(self).__name__} can only have primary " - "link.") + raise ValueError(f"{type(self).__name__} can only have primary " "link.") super().append(node, rank=rank) def __repr__(self) -> str: diff --git a/src/regmod/function.py b/src/regmod/function.py index 0b113f3..e2e333a 100644 --- a/src/regmod/function.py +++ b/src/regmod/function.py @@ -1,11 +1,13 @@ """ Function module """ + from dataclasses import dataclass, field -from typing import Callable import numpy as np +from regmod._typing import Callable + @dataclass class SmoothFunction: @@ -25,6 +27,7 @@ class SmoothFunction: Derivative of the function. d2fun : Callable Second derivative of the function. + """ name: str @@ -60,8 +63,8 @@ def exp_d2fun(x): def expit_fun(x): neg_indices = x < 0 - z = np.exp(-np.sqrt(x*x)) - y = 1/(1 + z) + z = np.exp(-np.sqrt(x * x)) + y = 1 / (1 + z) if np.isscalar(x): if neg_indices: y = 1 - y @@ -71,15 +74,15 @@ def expit_fun(x): def expit_dfun(x): - z = np.exp(-np.sqrt(x*x)) - y = z/(1 + z)**2 + z = np.exp(-np.sqrt(x * x)) + y = z / (1 + z) ** 2 return y def expit_d2fun(x): neg_indices = x < 0 - z = np.exp(-np.sqrt(x*x)) - y = z*(z - 1)/(z + 1)**3 + z = np.exp(-np.sqrt(x * x)) + y = z * (z - 1) / (z + 1) ** 3 if np.isscalar(x): if neg_indices: y = -y @@ -93,55 +96,54 @@ def log_fun(x): def log_dfun(x): - return 1/x + return 1 / x def log_d2fun(x): - return -1/x**2 + return -1 / x**2 def logit_fun(x): - return np.log(x/(1 - x)) + return np.log(x / (1 - x)) def logit_dfun(x): - return 1/(x*(1 - x)) + return 1 / (x * (1 - x)) def logit_d2fun(x): - return (2*x - 1)/(x*(1 - x))**2 + return (2 * x - 1) / (x * (1 - x)) ** 2 fun_list = [ - SmoothFunction(name="identity", - fun=identity_fun, - inv_fun=identity_fun, - dfun=identity_dfun, - d2fun=identity_d2fun), - SmoothFunction(name="exp", - fun=exp_fun, - inv_fun=log_fun, - dfun=exp_dfun, - d2fun=exp_d2fun), - SmoothFunction(name="expit", - fun=expit_fun, - inv_fun=logit_fun, - dfun=expit_dfun, - d2fun=expit_d2fun), - SmoothFunction(name="log", - fun=log_fun, - inv_fun=exp_fun, - dfun=log_dfun, - d2fun=log_d2fun), - SmoothFunction(name="logit", - fun=logit_fun, - inv_fun=expit_fun, - dfun=logit_dfun, - d2fun=logit_d2fun), + SmoothFunction( + name="identity", + fun=identity_fun, + inv_fun=identity_fun, + dfun=identity_dfun, + d2fun=identity_d2fun, + ), + SmoothFunction( + name="exp", fun=exp_fun, inv_fun=log_fun, dfun=exp_dfun, d2fun=exp_d2fun + ), + SmoothFunction( + name="expit", + fun=expit_fun, + inv_fun=logit_fun, + dfun=expit_dfun, + d2fun=expit_d2fun, + ), + SmoothFunction( + name="log", fun=log_fun, inv_fun=exp_fun, dfun=log_dfun, d2fun=log_d2fun + ), + SmoothFunction( + name="logit", + fun=logit_fun, + inv_fun=expit_fun, + dfun=logit_dfun, + d2fun=logit_d2fun, + ), ] -fun_dict = { - fun.name: fun - for fun in fun_list -} +fun_dict = {fun.name: fun for fun in fun_list} diff --git a/src/regmod/models/binomial.py b/src/regmod/models/binomial.py index ccbf815..8e2df73 100644 --- a/src/regmod/models/binomial.py +++ b/src/regmod/models/binomial.py @@ -1,13 +1,11 @@ """ Binomial Model """ -from typing import Callable, List, Tuple import numpy as np -import pandas as pd -from numpy.typing import NDArray from scipy.stats import binom +from regmod._typing import Callable, DataFrame, NDArray from regmod.optimizer import msca_optimize from .model import Model @@ -18,11 +16,12 @@ class BinomialModel(Model): param_names = ("p",) default_param_specs = {"p": {"inv_link": "expit"}} - def attach_df(self, df: pd.DataFrame): + def attach_df(self, df: DataFrame): super().attach_df(df) if not np.all((self.y >= 0) & (self.y <= 1)): - raise ValueError("Binomial model requires observations to be " - "between zero and one.") + raise ValueError( + "Binomial model requires observations to be " "between zero and one." + ) self.mat[0], self.cmat, self.cvec = model_post_init( self.mat[0], self.uvec, self.linear_umat, self.linear_uvec ) @@ -37,17 +36,15 @@ def objective(self, coefs: NDArray) -> float: ------- float Objective value. + """ inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) - weights = self.weights*self.trim_weights + weights = self.weights * self.trim_weights obj_param = -weights * ( - self.y * np.log(param) + - (1 - self.y) * np.log(1 - param) + self.y * np.log(param) + (1 - self.y) * np.log(1 - param) ) return obj_param.sum() + self.objective_from_gprior(coefs) @@ -63,19 +60,16 @@ def gradient(self, coefs: NDArray) -> NDArray: ------- NDArray Gradient vector. + """ mat = self.mat[0] inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) dparam = inv_link.dfun(lin_param) - weights = self.weights*self.trim_weights - grad_param = weights * ( - (param - self.y) / (param*(1 - param)) * dparam - ) + weights = self.weights * self.trim_weights + grad_param = weights * ((param - self.y) / (param * (1 - param)) * dparam) return mat.T.dot(grad_param) + self.gradient_from_gprior(coefs) @@ -91,20 +85,19 @@ def hessian(self, coefs: NDArray) -> NDArray: ------- NDArray Hessian matrix. + """ mat = self.mat[0] inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) dparam = inv_link.dfun(lin_param) d2param = inv_link.d2fun(lin_param) - weights = self.weights*self.trim_weights + weights = self.weights * self.trim_weights hess_param = weights * ( - (self.y / param**2 + (1 - self.y) / (1 - param)**2) * dparam**2 + - (param - self.y) / (param*(1 - param)) * d2param + (self.y / param**2 + (1 - self.y) / (1 - param) ** 2) * dparam**2 + + (param - self.y) / (param * (1 - param)) * d2param ) scaled_mat = mat.scale_rows(hess_param) @@ -124,49 +117,41 @@ def jacobian2(self, coefs: NDArray) -> NDArray: ------- NDArray Jacobian matrix. + """ mat = self.mat[0] inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) dparam = inv_link.dfun(lin_param) - weights = self.weights*self.trim_weights - grad_param = weights * ( - (param - self.y) / (param*(1 - param)) * dparam - ) + weights = self.weights * self.trim_weights + grad_param = weights * ((param - self.y) / (param * (1 - param)) * dparam) jacobian = mat.T.scale_cols(grad_param) hess_mat_gprior = type(jacobian)(self.hessian_from_gprior()) jacobian2 = jacobian.dot(jacobian.T) + hess_mat_gprior return jacobian2 - def fit(self, - optimizer: Callable = msca_optimize, - **optimizer_options): + def fit(self, optimizer: Callable = msca_optimize, **optimizer_options): """Fit function. Parameters ---------- optimizer : Callable, optional Model solver, by default scipy_optimize. + """ - super().fit( - optimizer=optimizer, - **optimizer_options - ) + super().fit(optimizer=optimizer, **optimizer_options) - def nll(self, params: List[NDArray]) -> NDArray: - return -(self.y*np.log(params[0]) + (1 - self.y)*np.log(1.0 - params[0])) + def nll(self, params: list[NDArray]) -> NDArray: + return -(self.y * np.log(params[0]) + (1 - self.y) * np.log(1.0 - params[0])) - def dnll(self, params: List[NDArray]) -> List[NDArray]: - return [-(self.y/params[0] - (1 - self.y)/(1.0 - params[0]))] + def dnll(self, params: list[NDArray]) -> list[NDArray]: + return [-(self.y / params[0] - (1 - self.y) / (1.0 - params[0]))] - def d2nll(self, params: List[NDArray]) -> List[List[NDArray]]: - return [[self.y/params[0]**2 + (1 - self.y)/(1.0 - params[0])**2]] + def d2nll(self, params: list[NDArray]) -> list[list[NDArray]]: + return [[self.y / params[0] ** 2 + (1 - self.y) / (1.0 - params[0]) ** 2]] - def get_ui(self, params: List[NDArray], bounds: Tuple[float, float]) -> NDArray: + def get_ui(self, params: list[NDArray], bounds: tuple[float, float]) -> NDArray: p = params[0] n = self.y_sample_sizes - return [binom.ppf(bounds[0], n=n, p=p), - binom.ppf(bounds[1], n=n, p=p)] + return [binom.ppf(bounds[0], n=n, p=p), binom.ppf(bounds[1], n=n, p=p)] diff --git a/src/regmod/models/gaussian.py b/src/regmod/models/gaussian.py index ad2bc0b..faa6416 100644 --- a/src/regmod/models/gaussian.py +++ b/src/regmod/models/gaussian.py @@ -1,13 +1,11 @@ """ Gaussian Model """ -from typing import Callable, List, Tuple import numpy as np -import pandas as pd -from numpy.typing import NDArray from scipy.stats import norm +from regmod._typing import Callable, DataFrame, NDArray from regmod.optimizer import msca_optimize from .model import Model @@ -18,7 +16,7 @@ class GaussianModel(Model): param_names = ("mu",) default_param_specs = {"mu": {"inv_link": "identity"}} - def attach_df(self, df: pd.DataFrame): + def attach_df(self, df: DataFrame): super().attach_df(df) self.mat[0], self.cmat, self.cvec = model_post_init( self.mat[0], self.uvec, self.linear_umat, self.linear_uvec @@ -34,17 +32,14 @@ def objective(self, coefs: NDArray) -> float: ------- float Objective value. + """ inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) - weights = self.weights*self.trim_weights - obj_param = weights * 0.5 * ( - param - self.y - )**2 + weights = self.weights * self.trim_weights + obj_param = weights * 0.5 * (param - self.y) ** 2 return obj_param.sum() + self.objective_from_gprior(coefs) def gradient(self, coefs: NDArray) -> NDArray: @@ -59,19 +54,16 @@ def gradient(self, coefs: NDArray) -> NDArray: ------- NDArray Gradient vector. + """ mat = self.mat[0] inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) dparam = inv_link.dfun(lin_param) - weights = self.weights*self.trim_weights - grad_param = weights * ( - param - self.y - ) * dparam + weights = self.weights * self.trim_weights + grad_param = weights * (param - self.y) * dparam return mat.T.dot(grad_param) + self.gradient_from_gprior(coefs) @@ -87,20 +79,17 @@ def hessian(self, coefs: NDArray) -> NDArray: ------- NDArray Hessian matrix. + """ mat = self.mat[0] inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) dparam = inv_link.dfun(lin_param) d2param = inv_link.d2fun(lin_param) - weights = self.weights*self.trim_weights - hess_param = weights * ( - dparam**2 + (param - self.y)*d2param - ) + weights = self.weights * self.trim_weights + hess_param = weights * (dparam**2 + (param - self.y) * d2param) scaled_mat = mat.scale_rows(hess_param) hess_mat = mat.T.dot(scaled_mat) @@ -119,47 +108,44 @@ def jacobian2(self, coefs: NDArray) -> NDArray: ------- NDArray Jacobian matrix. + """ mat = self.mat[0] inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) dparam = inv_link.dfun(lin_param) - weights = self.weights*self.trim_weights + weights = self.weights * self.trim_weights grad_param = weights * (param - self.y) * dparam jacobian = mat.T.scale_cols(grad_param) hess_mat_gprior = type(jacobian)(self.hessian_from_gprior()) jacobian2 = jacobian.dot(jacobian.T) + hess_mat_gprior return jacobian2 - def fit(self, - optimizer: Callable = msca_optimize, - **optimizer_options): + def fit(self, optimizer: Callable = msca_optimize, **optimizer_options): """Fit function. Parameters ---------- optimizer : Callable, optional Model solver, by default scipy_optimize. + """ - super().fit( - optimizer=optimizer, - **optimizer_options - ) + super().fit(optimizer=optimizer, **optimizer_options) - def nll(self, params: List[NDArray]) -> NDArray: - return 0.5*(params[0] - self.y)**2 + def nll(self, params: list[NDArray]) -> NDArray: + return 0.5 * (params[0] - self.y) ** 2 - def dnll(self, params: List[NDArray]) -> List[NDArray]: + def dnll(self, params: list[NDArray]) -> list[NDArray]: return [params[0] - self.y] - def d2nll(self, params: List[NDArray]) -> List[NDArray]: + def d2nll(self, params: list[NDArray]) -> list[NDArray]: return [[np.ones(self.df.shape[0])]] - def get_ui(self, params: List[NDArray], bounds: Tuple[float, float]) -> NDArray: + def get_ui(self, params: list[NDArray], bounds: tuple[float, float]) -> NDArray: mean = params[0] - sd = 1.0/np.sqrt(self.weights) - return [norm.ppf(bounds[0], loc=mean, scale=sd), - norm.ppf(bounds[1], loc=mean, scale=sd)] + sd = 1.0 / np.sqrt(self.weights) + return [ + norm.ppf(bounds[0], loc=mean, scale=sd), + norm.ppf(bounds[1], loc=mean, scale=sd), + ] diff --git a/src/regmod/models/model.py b/src/regmod/models/model.py index 795edc1..a3ce91f 100644 --- a/src/regmod/models/model.py +++ b/src/regmod/models/model.py @@ -1,15 +1,12 @@ """ Model module """ -from typing import Callable, Dict, List, Optional, Tuple, Union import numpy as np -import pandas as pd -from msca.linalg.matrix import Matrix -from numpy import ndarray from scipy.linalg import block_diag from scipy.sparse import csc_matrix +from regmod._typing import Callable, DataFrame, Matrix, NDArray from regmod.optimizer import scipy_optimize from regmod.parameter import Parameter from regmod.utils import sizes_to_slices @@ -26,10 +23,10 @@ class Model: Column name for the weight of each observation data Data frame that contains all information - params : Optional[List[Parameter]], optional + params : list[Parameter], optional A list of parameters. Default to None. - param_specs : Optional[Dict[str, Dict]], optional - Dictionary of all parameter specifications. Default to None. + param_specs : dict[str, dict], optional + dictionary of all parameter specifications. Default to None. Raises ------ @@ -40,30 +37,34 @@ class Model: """ - param_names: Tuple[str] = None - default_param_specs: Dict[str, Dict] = None - - def __init__(self, - y: str, - weights: str = "weights", - df: Optional[pd.DataFrame] = None, - params: Optional[List[Parameter]] = None, - param_specs: Optional[Dict[str, Dict]] = None): + param_names: tuple[str] = None + default_param_specs: dict[str, dict] = None + + def __init__( + self, + y: str, + weights: str = "weights", + df: DataFrame | None = None, + params: list[Parameter] | None = None, + param_specs: dict[str, dict] | None = None, + ): if params is None and param_specs is None: raise ValueError("Must provide `params` or `param_specs`") if params is not None: - param_dict = { - param.name: param - for param in params - } - self.params = [param_dict[param_name] - for param_name in self.param_names] + param_dict = {param.name: param for param in params} + self.params = [param_dict[param_name] for param_name in self.param_names] else: - self.params = [Parameter(param_name, - **{**self.default_param_specs[param_name], - **param_specs[param_name]}) - for param_name in self.param_names] + self.params = [ + Parameter( + param_name, + **{ + **self.default_param_specs[param_name], + **param_specs[param_name], + }, + ) + for param_name in self.param_names + ] self._y = y self._weights = weights self.df = df @@ -83,7 +84,7 @@ def __init__(self, self._opt_coefs = None self._opt_vcov = None - def attach_df(self, df: pd.DataFrame): + def attach_df(self, df: DataFrame): self.df = df for param in self.params: param.check_data(df) @@ -104,34 +105,36 @@ def attach_df(self, df: pd.DataFrame): self.weights = self.df[self._weights].to_numpy() @property - def opt_coefs(self) -> Union[None, ndarray]: + def opt_coefs(self) -> NDArray | None: return self._opt_coefs @opt_coefs.setter - def opt_coefs(self, coefs: ndarray): + def opt_coefs(self, coefs: NDArray): coefs = np.asarray(coefs) if coefs.size != self.size: raise ValueError("Coefficients size not match.") self._opt_coefs = coefs @property - def opt_vcov(self) -> Union[None, ndarray]: + def opt_vcov(self) -> NDArray | None: return self._opt_vcov @opt_vcov.setter - def opt_vcov(self, vcov: ndarray): + def opt_vcov(self, vcov: NDArray): vcov = np.asarray(vcov) self._opt_vcov = vcov - def get_vcov(self, coefs: ndarray) -> ndarray: + def get_vcov(self, coefs: NDArray) -> NDArray: hessian = self.hessian(coefs) if isinstance(hessian, Matrix): hessian = hessian.to_numpy() - #We probably don't want to be eigendecomposing + # We probably don't want to be eigendecomposing eig_vals, eig_vecs = np.linalg.eigh(hessian) if np.isclose(eig_vals, 0.0).any(): - raise ValueError("singular Hessian matrix, please add priors or " - "reduce number of variables") + raise ValueError( + "singular Hessian matrix, please add priors or " + "reduce number of variables" + ) inv_hessian = (eig_vecs / eig_vals).dot(eig_vecs.T) jacobian2 = self.jacobian2(coefs) @@ -139,392 +142,424 @@ def get_vcov(self, coefs: ndarray) -> ndarray: jacobian2 = jacobian2.to_numpy() eig_vals = np.linalg.eigvalsh(jacobian2) if np.isclose(eig_vals, 0.0).any(): - raise ValueError("singular Jacobian matrix, please add priors or " - "reduce number of variables") + raise ValueError( + "singular Jacobian matrix, please add priors or " + "reduce number of variables" + ) vcov = inv_hessian.dot(jacobian2) vcov = inv_hessian.dot(vcov.T) return vcov - def get_mat(self) -> List[ndarray]: + def get_mat(self) -> list[NDArray]: """Get the design matrices. Returns ------- - List[ndarray] + list[NDArray] The design matrices. + """ return [param.get_mat(self.df) for param in self.params] - def get_uvec(self) -> ndarray: + def get_uvec(self) -> NDArray: """Get the direct Uniform prior array. Returns ------- - ndarray + NDArray The direct Uniform prior array. + """ return np.hstack([param.get_uvec() for param in self.params]) - def get_gvec(self) -> ndarray: + def get_gvec(self) -> NDArray: """Get the direct Gaussian prior array. Returns ------- - ndarray + NDArray The direct Gaussian prior array. + """ return np.hstack([param.get_gvec() for param in self.params]) - def get_linear_uvec(self) -> ndarray: + def get_linear_uvec(self) -> NDArray: """Get the linear Uniform prior array. Returns ------- - ndarray + NDArray The linear Uniform prior array. + """ return np.hstack([param.get_linear_uvec() for param in self.params]) - def get_linear_gvec(self) -> ndarray: + def get_linear_gvec(self) -> NDArray: """Get the linear Gaussian prior array. Returns ------- - ndarray + NDArray The linear Gaussian prior array. + """ return np.hstack([param.get_linear_gvec() for param in self.params]) - def get_linear_umat(self) -> ndarray: + def get_linear_umat(self) -> NDArray: """Get the linear Uniform prior design matrix. Returns ------- - ndarray + NDArray The linear Uniform prior design matrix. + """ return block_diag(*[param.get_linear_umat() for param in self.params]) - def get_linear_gmat(self) -> ndarray: + def get_linear_gmat(self) -> NDArray: """Get the linear Gaussian prior design matrix. Returns ------- - ndarray + NDArray The linear Gaussian prior design matrix. + """ return block_diag(*[param.get_linear_gmat() for param in self.params]) - def split_coefs(self, coefs: ndarray) -> List[ndarray]: + def split_coefs(self, coefs: NDArray) -> list[NDArray]: """Split coefficients into pieces for each parameter. Parameters ---------- - coefs : ndarray + coefs : NDArray The coefficients array. Returns ------- - List[ndarray] + list[NDArray] A list of splitted coefficients for each parameter. + """ assert len(coefs) == self.size return [coefs[index] for index in self.indices] - def get_params(self, coefs: ndarray) -> List[ndarray]: + def get_params(self, coefs: NDArray) -> list[NDArray]: """Get the parameters. Parameters ---------- - coefs : ndarray + coefs : NDArray The coefficients array. Returns ------- - List[ndarray] + list[NDArray] The parameters. + """ coefs = self.split_coefs(coefs) - return [param.get_param(coefs[i], self.df, mat=self.mat[i]) - for i, param in enumerate(self.params)] + return [ + param.get_param(coefs[i], self.df, mat=self.mat[i]) + for i, param in enumerate(self.params) + ] - def get_dparams(self, coefs: ndarray) -> List[ndarray]: + def get_dparams(self, coefs: NDArray) -> list[NDArray]: """Get the derivative of the parameters. Parameters ---------- - coefs : ndarray + coefs : NDArray The coefficients array. Returns ------- - List[ndarray] + list[NDArray] The derivative of the parameters. + """ coefs = self.split_coefs(coefs) - return [param.get_dparam(coefs[i], self.df, mat=self.mat[i]) - for i, param in enumerate(self.params)] + return [ + param.get_dparam(coefs[i], self.df, mat=self.mat[i]) + for i, param in enumerate(self.params) + ] - def get_d2params(self, coefs: ndarray) -> List[ndarray]: + def get_d2params(self, coefs: NDArray) -> list[NDArray]: """Get the second order derivative of the parameters. Parameters ---------- - coefs : ndarray + coefs : NDArray The coefficients array. Returns ------- - List[ndarray] + list[NDArray] The second order derivative of the parameters. + """ coefs = self.split_coefs(coefs) - return [param.get_d2param(coefs[i], self.df, mat=self.mat[i]) - for i, param in enumerate(self.params)] + return [ + param.get_d2param(coefs[i], self.df, mat=self.mat[i]) + for i, param in enumerate(self.params) + ] - def nll(self, params: List[ndarray]) -> ndarray: + def nll(self, params: list[NDArray]) -> NDArray: """Negative log likelihood. Parameters ---------- - params : List[ndarray] + params : list[NDArray] A list of parameters. Returns ------- - ndarray + NDArray An array of negative log likelihood for each observation. + """ raise NotImplementedError() - def dnll(self, params: List[ndarray]) -> List[ndarray]: + def dnll(self, params: list[NDArray]) -> list[NDArray]: """Derivative of negative the log likelihood. Parameters ---------- - params : List[ndarray] + params : list[NDArray] A list of parameters. Returns ------- - List[ndarray] + list[NDArray] A list of derivatives for each parameter and each observation. + """ raise NotImplementedError() - def d2nll(self, params: List[ndarray]) -> List[List[ndarray]]: + def d2nll(self, params: list[NDArray]) -> list[list[NDArray]]: """Second order derivative of the negative log likelihood. Parameters ---------- - params : List[ndarray] + params : list[NDArray] A list of parameters. Returns ------- - List[List[ndarray]] + list[list[NDArray]] A list of list of second order derivatives for each parameter and each observation. + """ raise NotImplementedError() - def get_ui(self, - params: List[ndarray], - bounds: Tuple[float, float]) -> ndarray: + def get_ui(self, params: list[NDArray], bounds: tuple[float, float]) -> NDArray: """Get uncertainty interval, used for the trimming algorithm. Parameters ---------- - params : List[ndarray] + params : list[NDArray] A list of parameters - bounds : Tuple[float, float] + bounds : tuple[float, float] Quantile bounds for the uncertainty interval. Returns ------- - ndarray + NDArray An array with uncertainty interval for each observation. + """ raise NotImplementedError() - def detect_outliers(self, - coefs: ndarray, - bounds: Tuple[float, float]) -> ndarray: + def detect_outliers(self, coefs: NDArray, bounds: tuple[float, float]) -> NDArray: """Detect outliers. Parameters ---------- - coefs : ndarray + coefs : NDArray Given coefficients. - bounds : Tuple[float, float] + bounds : tuple[float, float] Quantile bounds for the inliers. Returns ------- - ndarray + NDArray A boolean array that indicate if observations are outliers. + """ params = self.get_params(coefs) ui = self.get_ui(params, bounds) obs = self.df[self.y].to_numpy() return (obs < ui[0]) | (obs > ui[1]) - def objective_from_gprior(self, coefs: ndarray) -> float: + def objective_from_gprior(self, coefs: NDArray) -> float: """Objective function from the Gaussian priors. Parameters ---------- - coefs : ndarray + coefs : NDArray Given coefficients. Returns ------- float Objective function value. + """ - val = 0.5*np.sum((coefs - self.gvec[0])**2/self.gvec[1]**2) + val = 0.5 * np.sum((coefs - self.gvec[0]) ** 2 / self.gvec[1] ** 2) if self.linear_gvec.size > 0: - val += 0.5*np.sum((self.linear_gmat.dot(coefs) - self.linear_gvec[0])**2/self.linear_gvec[1]**2) + val += 0.5 * np.sum( + (self.linear_gmat.dot(coefs) - self.linear_gvec[0]) ** 2 + / self.linear_gvec[1] ** 2 + ) return val - def gradient_from_gprior(self, coefs: ndarray) -> ndarray: + def gradient_from_gprior(self, coefs: NDArray) -> NDArray: """Graident function from the Gaussian priors. Parameters ---------- - coefs : ndarray + coefs : NDArray Given coefficients. Returns ------- - ndarray + NDArray Graident vector. + """ - grad = (coefs - self.gvec[0])/self.gvec[1]**2 + grad = (coefs - self.gvec[0]) / self.gvec[1] ** 2 if self.linear_gvec.size > 0: - grad += (self.linear_gmat.T/self.linear_gvec[1]**2).dot(self.linear_gmat.dot(coefs) - self.linear_gvec[0]) + grad += (self.linear_gmat.T / self.linear_gvec[1] ** 2).dot( + self.linear_gmat.dot(coefs) - self.linear_gvec[0] + ) return grad - def hessian_from_gprior(self) -> ndarray: + def hessian_from_gprior(self) -> NDArray: """Hessian matrix from the Gaussian prior. Returns ------- - ndarray + NDArray Hessian matrix. + """ - hess = np.diag(1.0/self.gvec[1]**2) + hess = np.diag(1.0 / self.gvec[1] ** 2) if self.linear_gvec.size > 0: - hess += (self.linear_gmat.T/self.linear_gvec[1]**2).dot(self.linear_gmat) + hess += (self.linear_gmat.T / self.linear_gvec[1] ** 2).dot( + self.linear_gmat + ) return hess - def get_nll_terms(self, coefs: ndarray) -> ndarray: + def get_nll_terms(self, coefs: NDArray) -> NDArray: params = self.get_params(coefs) nll_terms = self.nll(params) - nll_terms = self.weights*nll_terms + nll_terms = self.weights * nll_terms return nll_terms - def objective(self, coefs: ndarray) -> float: + def objective(self, coefs: NDArray) -> float: """Objective function. Parameters ---------- - coefs : ndarray + coefs : NDArray Given coefficients. Returns ------- float Objective value. + """ nll_terms = self.get_nll_terms(coefs) - return self.trim_weights.dot(nll_terms) + \ - self.objective_from_gprior(coefs) + return self.trim_weights.dot(nll_terms) + self.objective_from_gprior(coefs) - def gradient(self, coefs: ndarray) -> ndarray: + def gradient(self, coefs: NDArray) -> NDArray: """Gradient function. Parameters ---------- - coefs : ndarray + coefs : NDArray Given coefficients. Returns ------- - ndarray + NDArray Gradient vector. + """ params = self.get_params(coefs) dparams = self.get_dparams(coefs) grad_params = self.dnll(params) - weights = self.weights*self.trim_weights - return np.hstack([ - dparams[i].T.dot(weights*grad_params[i]) - for i in range(self.num_params) - ]) + self.gradient_from_gprior(coefs) + weights = self.weights * self.trim_weights + return np.hstack( + [dparams[i].T.dot(weights * grad_params[i]) for i in range(self.num_params)] + ) + self.gradient_from_gprior(coefs) - def hessian(self, coefs: ndarray) -> ndarray: + def hessian(self, coefs: NDArray) -> NDArray: """Hessian function. Parameters ---------- - coefs : ndarray + coefs : NDArray Given coefficients. Returns ------- - ndarray + NDArray Hessian matrix. + """ params = self.get_params(coefs) dparams = self.get_dparams(coefs) d2params = self.get_d2params(coefs) grad_params = self.dnll(params) hess_params = self.d2nll(params) - weights = self.weights*self.trim_weights + weights = self.weights * self.trim_weights hess = [ - [(dparams[i].T*(weights*hess_params[i][j])).dot(dparams[j]) - for j in range(self.num_params)] + [ + (dparams[i].T * (weights * hess_params[i][j])).dot(dparams[j]) + for j in range(self.num_params) + ] for i in range(self.num_params) ] for i in range(self.num_params): - hess[i][i] += np.tensordot(weights*grad_params[i], d2params[i], axes=1) + hess[i][i] += np.tensordot(weights * grad_params[i], d2params[i], axes=1) return np.block(hess) + self.hessian_from_gprior() - def jacobian2(self, coefs: ndarray) -> ndarray: + def jacobian2(self, coefs: NDArray) -> NDArray: """Jacobian function. Parameters ---------- - coefs : ndarray + coefs : NDArray Given coefficients. Returns ------- - ndarray + NDArray Jacobian matrix. + """ params = self.get_params(coefs) dparams = self.get_dparams(coefs) grad_params = self.dnll(params) - weights = self.weights*self.trim_weights - jacobian = np.vstack([ - dparams[i].T*(weights*grad_params[i]) - for i in range(self.num_params) - ]) + weights = self.weights * self.trim_weights + jacobian = np.vstack( + [dparams[i].T * (weights * grad_params[i]) for i in range(self.num_params)] + ) jacobian2 = jacobian.dot(jacobian.T) + self.hessian_from_gprior() return jacobian2 - def fit(self, - optimizer: Callable = scipy_optimize, - **optimizer_options): + def fit(self, optimizer: Callable = scipy_optimize, **optimizer_options): """Fit function. Parameters ---------- optimizer : Callable, optional Model solver, by default scipy_optimize. + """ if self.size == 0: self.opt_coefs = np.empty((0,)) @@ -533,19 +568,20 @@ def fit(self, return optimizer(self, **optimizer_options) - def predict(self, df: Optional[pd.DataFrame] = None) -> pd.DataFrame: + def predict(self, df: DataFrame | None = None) -> DataFrame: """Predict the parameters. Parameters ---------- - df : pd.DataFrame, optional + df : DataFrame, optional Data Frame with prediction data. If it is None, using the training data. Returns ------- - pd.DataFrame + DataFrame Data frame with predicted parameters. + """ if df is None: df = self.df @@ -558,7 +594,9 @@ def predict(self, df: Optional[pd.DataFrame] = None) -> pd.DataFrame: return df def __repr__(self) -> str: - return (f"{type(self).__name__}(" - f"bnum_y={self.df.shape[0]}, " - f"num_params={self.num_params}, " - f"size={self.size})") + return ( + f"{type(self).__name__}(" + f"bnum_y={self.df.shape[0]}, " + f"num_params={self.num_params}, " + f"size={self.size})" + ) diff --git a/src/regmod/models/negativebinomial.py b/src/regmod/models/negativebinomial.py index 49b802e..135b1fc 100644 --- a/src/regmod/models/negativebinomial.py +++ b/src/regmod/models/negativebinomial.py @@ -1,44 +1,55 @@ """ Negative Binomial Model """ -# pylint: disable=no-name-in-module -from typing import List, Tuple +# pylint: disable=no-name-in-module import numpy as np -import pandas as pd -from numpy import ndarray from scipy.special import digamma, loggamma, polygamma from scipy.stats import nbinom +from regmod._typing import DataFrame, NDArray + from .model import Model class NegativeBinomialModel(Model): - param_names: List[str] = ("n", "p") - default_param_specs = {"n": {"inv_link": "exp"}, - "p": {"inv_link": "expit"}} + param_names: list[str] = ("n", "p") + default_param_specs = {"n": {"inv_link": "exp"}, "p": {"inv_link": "expit"}} - def attach_df(self, df: pd.DataFrame): + def attach_df(self, df: DataFrame): super().attach_df(df) if not np.all(self.y >= 0): - raise ValueError("Negative-Binomial model requires observations to be non-negative.") - - def nll(self, params: List[ndarray]) -> ndarray: - return -(loggamma(params[0] + self.y) - - loggamma(params[0]) + - self.y*np.log(1 - params[1]) + - params[0]*np.log(params[1])) - - def dnll(self, params: List[ndarray]) -> List[ndarray]: - return [-(digamma(params[0] + self.y) - digamma(params[0]) + np.log(params[1])), - self.y/(1 - params[1]) - params[0]/params[1]] - - def d2nll(self, params: List[ndarray]) -> List[List[ndarray]]: - return [[polygamma(1, params[0]) - polygamma(1, params[0] + self.y), -1/params[1]], - [-1/params[1], params[0]/params[1]**2 + self.y/(1 - params[1])**2]] - - def get_ui(self, params: List[ndarray], bounds: Tuple[float, float]) -> np.ndarray: + raise ValueError( + "Negative-Binomial model requires observations to be non-negative." + ) + + def nll(self, params: list[NDArray]) -> NDArray: + return -( + loggamma(params[0] + self.y) + - loggamma(params[0]) + + self.y * np.log(1 - params[1]) + + params[0] * np.log(params[1]) + ) + + def dnll(self, params: list[NDArray]) -> list[NDArray]: + return [ + -(digamma(params[0] + self.y) - digamma(params[0]) + np.log(params[1])), + self.y / (1 - params[1]) - params[0] / params[1], + ] + + def d2nll(self, params: list[NDArray]) -> list[list[NDArray]]: + return [ + [ + polygamma(1, params[0]) - polygamma(1, params[0] + self.y), + -1 / params[1], + ], + [ + -1 / params[1], + params[0] / params[1] ** 2 + self.y / (1 - params[1]) ** 2, + ], + ] + + def get_ui(self, params: list[NDArray], bounds: tuple[float, float]) -> NDArray: n = params[0] p = params[1] - return [nbinom.ppf(bounds[0], n=n, p=p), - nbinom.ppf(bounds[1], n=n, p=p)] + return [nbinom.ppf(bounds[0], n=n, p=p), nbinom.ppf(bounds[1], n=n, p=p)] diff --git a/src/regmod/models/pogit.py b/src/regmod/models/pogit.py index e47e326..7d18bc0 100644 --- a/src/regmod/models/pogit.py +++ b/src/regmod/models/pogit.py @@ -1,40 +1,35 @@ """ Pogit Model """ -from typing import List, Tuple import numpy as np -import pandas as pd -from numpy import ndarray from scipy.stats import poisson +from regmod._typing import DataFrame, NDArray + from .model import Model class PogitModel(Model): param_names = ("p", "lam") - default_param_specs = {"p": {"inv_link": "expit"}, - "lam": {"inv_link": "exp"}} + default_param_specs = {"p": {"inv_link": "expit"}, "lam": {"inv_link": "exp"}} - def attach_df(self, df: pd.DataFrame): + def attach_df(self, df: DataFrame): super().attach_df(df) if not all(self.y >= 0): raise ValueError("Pogit model requires observations to be non-negagive.") - def nll(self, params: List[ndarray]) -> ndarray: - mean = params[0]*params[1] - return mean - self.y*np.log(mean) + def nll(self, params: list[NDArray]) -> NDArray: + mean = params[0] * params[1] + return mean - self.y * np.log(mean) - def dnll(self, params: List[ndarray]) -> List[ndarray]: - return [params[1] - self.y/params[0], - params[0] - self.y/params[1]] + def dnll(self, params: list[NDArray]) -> list[NDArray]: + return [params[1] - self.y / params[0], params[0] - self.y / params[1]] - def d2nll(self, params: List[ndarray]) -> List[List[ndarray]]: + def d2nll(self, params: list[NDArray]) -> list[list[NDArray]]: ones = np.ones(self.df.shape[0]) - return [[self.y/params[0]**2, ones], - [ones, self.y/params[1]**2]] + return [[self.y / params[0] ** 2, ones], [ones, self.y / params[1] ** 2]] - def get_ui(self, params: List[ndarray], bounds: Tuple[float, float]) -> ndarray: - mean = params[0]*params[1] - return [poisson.ppf(bounds[0], mu=mean), - poisson.ppf(bounds[1], mu=mean)] + def get_ui(self, params: list[NDArray], bounds: tuple[float, float]) -> NDArray: + mean = params[0] * params[1] + return [poisson.ppf(bounds[0], mu=mean), poisson.ppf(bounds[1], mu=mean)] diff --git a/src/regmod/models/poisson.py b/src/regmod/models/poisson.py index c2dc929..9ef7c25 100644 --- a/src/regmod/models/poisson.py +++ b/src/regmod/models/poisson.py @@ -1,13 +1,11 @@ """ Poisson Model """ -from typing import Callable, List, Tuple import numpy as np -import pandas as pd -from numpy.typing import NDArray from scipy.stats import poisson +from regmod._typing import Callable, DataFrame, NDArray from regmod.optimizer import msca_optimize from .model import Model @@ -18,7 +16,7 @@ class PoissonModel(Model): param_names = ("lam",) default_param_specs = {"lam": {"inv_link": "exp"}} - def attach_df(self, df: pd.DataFrame): + def attach_df(self, df: DataFrame): super().attach_df(df) if not all(self.y >= 0): raise ValueError("Poisson model requires observations to be non-negagive.") @@ -36,17 +34,14 @@ def objective(self, coefs: NDArray) -> float: ------- float Objective value. + """ inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) - weights = self.weights*self.trim_weights - obj_param = weights * ( - param - self.y * np.log(param) - ) + weights = self.weights * self.trim_weights + obj_param = weights * (param - self.y * np.log(param)) return obj_param.sum() + self.objective_from_gprior(coefs) def gradient(self, coefs: NDArray) -> NDArray: @@ -61,19 +56,16 @@ def gradient(self, coefs: NDArray) -> NDArray: ------- NDArray Gradient vector. + """ mat = self.mat[0] inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) dparam = inv_link.dfun(lin_param) - weights = self.weights*self.trim_weights - grad_param = weights * ( - 1 - self.y / param - ) * dparam + weights = self.weights * self.trim_weights + grad_param = weights * (1 - self.y / param) * dparam return mat.T.dot(grad_param) + self.gradient_from_gprior(coefs) @@ -89,20 +81,18 @@ def hessian(self, coefs: NDArray) -> NDArray: ------- NDArray Hessian matrix. + """ mat = self.mat[0] inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) dparam = inv_link.dfun(lin_param) d2param = inv_link.d2fun(lin_param) - weights = self.weights*self.trim_weights + weights = self.weights * self.trim_weights hess_param = weights * ( - self.y / param**2 * dparam**2 + - (1 - self.y / param) * d2param + self.y / param**2 * dparam**2 + (1 - self.y / param) * d2param ) scaled_mat = mat.scale_rows(hess_param) @@ -122,46 +112,40 @@ def jacobian2(self, coefs: NDArray) -> NDArray: ------- NDArray Jacobian matrix. + """ mat = self.mat[0] inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) dparam = inv_link.dfun(lin_param) - weights = self.weights*self.trim_weights - grad_param = weights * (1.0 - self.y/param) * dparam + weights = self.weights * self.trim_weights + grad_param = weights * (1.0 - self.y / param) * dparam jacobian = mat.T.scale_cols(grad_param) hess_mat_gprior = type(jacobian)(self.hessian_from_gprior()) jacobian2 = jacobian.dot(jacobian.T) + hess_mat_gprior return jacobian2 - def fit(self, - optimizer: Callable = msca_optimize, - **optimizer_options): + def fit(self, optimizer: Callable = msca_optimize, **optimizer_options): """Fit function. Parameters ---------- optimizer : Callable, optional Model solver, by default scipy_optimize. + """ - super().fit( - optimizer=optimizer, - **optimizer_options - ) + super().fit(optimizer=optimizer, **optimizer_options) - def nll(self, params: List[NDArray]) -> NDArray: - return params[0] - self.y*np.log(params[0]) + def nll(self, params: list[NDArray]) -> NDArray: + return params[0] - self.y * np.log(params[0]) - def dnll(self, params: List[NDArray]) -> List[NDArray]: - return [1.0 - self.y/params[0]] + def dnll(self, params: list[NDArray]) -> list[NDArray]: + return [1.0 - self.y / params[0]] - def d2nll(self, params: List[NDArray]) -> List[List[NDArray]]: - return [[self.y/params[0]**2]] + def d2nll(self, params: list[NDArray]) -> list[list[NDArray]]: + return [[self.y / params[0] ** 2]] - def get_ui(self, params: List[NDArray], bounds: Tuple[float, float]) -> NDArray: + def get_ui(self, params: list[NDArray], bounds: tuple[float, float]) -> NDArray: mean = params[0] - return [poisson.ppf(bounds[0], mu=mean), - poisson.ppf(bounds[1], mu=mean)] + return [poisson.ppf(bounds[0], mu=mean), poisson.ppf(bounds[1], mu=mean)] diff --git a/src/regmod/models/tobit.py b/src/regmod/models/tobit.py index df4818e..01a708f 100644 --- a/src/regmod/models/tobit.py +++ b/src/regmod/models/tobit.py @@ -1,16 +1,13 @@ """ Tobit Model """ -# pylint: disable=C0103 -from typing import List, Optional +# pylint: disable=C0103 import jax.numpy as jnp from jax import grad, hessian, jit, lax -from jax.numpy import DeviceArray from jax.scipy.stats.norm import logcdf, logpdf -from numpy.typing import ArrayLike -from pandas import DataFrame +from regmod._typing import ArrayLike, DataFrame, JaxArray from regmod.function import SmoothFunction from .model import Model @@ -20,16 +17,12 @@ fun=jnp.array, inv_fun=jnp.array, dfun=jnp.ones_like, - d2fun=jnp.zeros_like + d2fun=jnp.zeros_like, ) exp_jax = SmoothFunction( - name="exp_jax", - fun=jnp.exp, - inv_fun=jnp.log, - dfun=jnp.exp, - d2fun=jnp.exp + name="exp_jax", fun=jnp.exp, inv_fun=jnp.log, dfun=jnp.exp, d2fun=jnp.exp ) @@ -43,10 +36,7 @@ class TobitModel(Model): """ param_names = ("mu", "sigma") - default_param_specs = { - "mu": {"inv_link": 'identity'}, - "sigma": {"inv_link": 'exp'} - } + default_param_specs = {"mu": {"inv_link": "identity"}, "sigma": {"inv_link": "exp"}} def __init__(self, *args, **kwargs) -> None: """Initialize tobit model. @@ -72,9 +62,9 @@ def __init__(self, *args, **kwargs) -> None: # Use JAX inv_link functions for param in self.params: link_name = param.inv_link.name - if link_name == 'identity': + if link_name == "identity": param.inv_link = identity_jax - elif link_name == 'exp': + elif link_name == "exp": param.inv_link = exp_jax else: msg = f"No JAX implementation of {link_name} inv_link." @@ -103,12 +93,15 @@ def attach_df(self, df: DataFrame) -> None: # Data structures for objective self.offset = [ - jnp.asarray(df[param.offset]) - if param.offset is not None else jnp.zeros(df.shape[0]) + ( + jnp.asarray(df[param.offset]) + if param.offset is not None + else jnp.zeros(df.shape[0]) + ) for param in self.params ] self.y = jnp.asarray(self.y) - self.weights = jnp.asarray(self.trim_weights*self.weights) + self.weights = jnp.asarray(self.trim_weights * self.weights) def objective(self, coefs: ArrayLike) -> float: """Get negative log likelihood wrt coefficients. @@ -125,12 +118,20 @@ def objective(self, coefs: ArrayLike) -> float: """ coef_list = [coefs[index] for index in self.indices] - link_list = [param.inv_link.name == 'exp_jax' for param in self.params] - return _objective(coef_list, link_list, self.mat, self.offset, - self.y, self.weights, self.gvec, - self.linear_gvec, self.linear_gmat) + link_list = [param.inv_link.name == "exp_jax" for param in self.params] + return _objective( + coef_list, + link_list, + self.mat, + self.offset, + self.y, + self.weights, + self.gvec, + self.linear_gvec, + self.linear_gmat, + ) - def gradient(self, coefs: ArrayLike) -> DeviceArray: + def gradient(self, coefs: ArrayLike) -> JaxArray: """Get gradient of negative log likelihood wrt coefficients. Parameters @@ -140,18 +141,26 @@ def gradient(self, coefs: ArrayLike) -> DeviceArray: Returns ------- - DeviceArray + JaxArray Gradient of negative log likelihood. """ coef_list = [coefs[index] for index in self.indices] - link_list = [param.inv_link.name == 'exp_jax' for param in self.params] - temp = _gradient(coef_list, link_list, self.mat, self.offset, - self.y, self.weights, self.gvec, - self.linear_gvec, self.linear_gmat) + link_list = [param.inv_link.name == "exp_jax" for param in self.params] + temp = _gradient( + coef_list, + link_list, + self.mat, + self.offset, + self.y, + self.weights, + self.gvec, + self.linear_gvec, + self.linear_gmat, + ) return jnp.concatenate(temp) - def hessian(self, coefs: ArrayLike) -> DeviceArray: + def hessian(self, coefs: ArrayLike) -> JaxArray: """Get hessian of negative log likelihood wrt coefficients. Parameters @@ -161,22 +170,29 @@ def hessian(self, coefs: ArrayLike) -> DeviceArray: Returns ------- - DeviceArray + JaxArray Hessian of negative log likelihood. """ coef_list = [coefs[index] for index in self.indices] - link_list = [param.inv_link.name == 'exp_jax' for param in self.params] - temp = _hessian(coef_list, link_list, self.mat, self.offset, - self.y, self.weights, self.gvec, - self.linear_gvec, self.linear_gmat) - hess = jnp.concatenate([ - jnp.concatenate(temp[0], axis=1), - jnp.concatenate(temp[1], axis=1) - ], axis=0) + link_list = [param.inv_link.name == "exp_jax" for param in self.params] + temp = _hessian( + coef_list, + link_list, + self.mat, + self.offset, + self.y, + self.weights, + self.gvec, + self.linear_gvec, + self.linear_gmat, + ) + hess = jnp.concatenate( + [jnp.concatenate(temp[0], axis=1), jnp.concatenate(temp[1], axis=1)], axis=0 + ) return hess - def nll(self, params: List[ArrayLike]) -> DeviceArray: + def nll(self, params: list[ArrayLike]) -> JaxArray: """Get terms of negative log likelihood wrt parameters. Parameters @@ -186,13 +202,13 @@ def nll(self, params: List[ArrayLike]) -> DeviceArray: Returns ------- - DeviceArray + JaxArray Terms of negative log likelihood. """ return _nll(self.y, params) - def dnll(self, params: List[ArrayLike]) -> List[DeviceArray]: + def dnll(self, params: list[ArrayLike]) -> list[JaxArray]: """Get derivative of negative log likelihood wrt parameters. Parameters @@ -202,13 +218,13 @@ def dnll(self, params: List[ArrayLike]) -> List[DeviceArray]: Returns ------- - list[DeviceArray] + list[JaxArray] Derivatives of negative log likelihood. """ return _dnll(self.y, params) - def get_vcov(self, coefs: ArrayLike) -> DeviceArray: + def get_vcov(self, coefs: ArrayLike) -> JaxArray: """Get variance-covariance matrix. Parameters @@ -218,7 +234,7 @@ def get_vcov(self, coefs: ArrayLike) -> DeviceArray: Returns ------- - DeviceArray + JaxArray Variance-covariance matrix. Notes @@ -232,7 +248,7 @@ def get_vcov(self, coefs: ArrayLike) -> DeviceArray: inv_H = jnp.linalg.inv(H) return inv_H.dot(J.dot(inv_H.T)) - def predict(self, df: Optional[DataFrame] = None) -> DataFrame: + def predict(self, df: DataFrame | None = None) -> DataFrame: """Predict mu, sigma, and censored mu. Parameters @@ -253,10 +269,17 @@ def predict(self, df: Optional[DataFrame] = None) -> DataFrame: @jit -def _objective(coef_list: List[ArrayLike], link_list: List[bool], - mat: List[ArrayLike], offset: ArrayLike, y: ArrayLike, - weights: ArrayLike, gvec: ArrayLike, linear_gvec: ArrayLike, - linear_gmat: ArrayLike) -> float: +def _objective( + coef_list: list[ArrayLike], + link_list: list[bool], + mat: list[ArrayLike], + offset: ArrayLike, + y: ArrayLike, + weights: ArrayLike, + gvec: ArrayLike, + linear_gvec: ArrayLike, + linear_gmat: ArrayLike, +) -> float: """Get negative log likelihood wrt coefficients. Parameters @@ -294,28 +317,29 @@ def _objective(coef_list: List[ArrayLike], link_list: List[bool], link_list[ii], lambda x: jnp.exp(x), lambda x: x, - mat[ii].dot(coef_list[ii]) + offset[ii] + mat[ii].dot(coef_list[ii]) + offset[ii], ) ) nll_terms = _nll(y, param_list) - obj_param = jnp.sum(weights*nll_terms) + obj_param = jnp.sum(weights * nll_terms) # Get objective from prior coefs = jnp.concatenate(coef_list) - obj_prior = jnp.sum((coefs - gvec[0])**2/gvec[1]**2)/2 + obj_prior = jnp.sum((coefs - gvec[0]) ** 2 / gvec[1] ** 2) / 2 obj_prior = lax.cond( linear_gvec.size > 0, - lambda x: x + 0.5*jnp.sum((linear_gmat.dot(coefs) - - linear_gvec[0])**2/linear_gvec[1]**2), + lambda x: x + + 0.5 + * jnp.sum((linear_gmat.dot(coefs) - linear_gvec[0]) ** 2 / linear_gvec[1] ** 2), lambda x: x, - obj_prior + obj_prior, ) return obj_param + obj_prior @jit -def _nll(y: ArrayLike, params: List[ArrayLike]) -> DeviceArray: +def _nll(y: ArrayLike, params: list[ArrayLike]) -> JaxArray: """Get terms of negative log likelihood wrt parameters. Parameters @@ -327,14 +351,14 @@ def _nll(y: ArrayLike, params: List[ArrayLike]) -> DeviceArray: Returns ------- - DeviceArray + JaxArray Terms of negative log likelihood. """ mu = params[0] sigma = params[1] - pos_term = jnp.log(sigma) - logpdf((y - mu)/sigma) - npos_term = -logcdf(-mu/sigma) + pos_term = jnp.log(sigma) - logpdf((y - mu) / sigma) + npos_term = -logcdf(-mu / sigma) return jnp.where(y > 0, pos_term, npos_term) diff --git a/src/regmod/models/utils.py b/src/regmod/models/utils.py index 0cdd39a..e1f7e22 100644 --- a/src/regmod/models/utils.py +++ b/src/regmod/models/utils.py @@ -1,17 +1,16 @@ -from typing import Tuple - import numpy as np -from msca.linalg.matrix import Matrix, asmatrix -from numpy.typing import NDArray +from msca.linalg.matrix import asmatrix from scipy.sparse import csc_matrix +from regmod._typing import Matrix, NDArray + def model_post_init( mat: NDArray, uvec: NDArray, linear_umat: NDArray, linear_uvec: NDArray, -) -> Tuple[Matrix, Matrix, NDArray]: +) -> tuple[Matrix, Matrix, NDArray]: # design matrix issparse = mat.size == 0 or ((mat == 0).sum() / mat.size) > 0.95 if issparse: @@ -31,12 +30,8 @@ def model_post_init( cmat = cmat / scale[:, np.newaxis] cvec = cvec / scale - cmat = np.vstack([ - -cmat[~np.isneginf(cvec[0])], cmat[~np.isposinf(cvec[1])] - ]) - cvec = np.hstack([ - -cvec[0][~np.isneginf(cvec[0])], cvec[1][~np.isposinf(cvec[1])] - ]) + cmat = np.vstack([-cmat[~np.isneginf(cvec[0])], cmat[~np.isposinf(cvec[1])]]) + cvec = np.hstack([-cvec[0][~np.isneginf(cvec[0])], cvec[1][~np.isposinf(cvec[1])]]) if issparse: cmat = csc_matrix(cmat).astype(np.float64) cmat = asmatrix(cmat) diff --git a/src/regmod/models/weibull.py b/src/regmod/models/weibull.py index a17ec46..b65a65b 100644 --- a/src/regmod/models/weibull.py +++ b/src/regmod/models/weibull.py @@ -1,13 +1,12 @@ """ Weibull Model """ -from typing import List, Tuple import numpy as np -import pandas as pd -from numpy import ndarray from scipy.stats import weibull_min +from regmod._typing import DataFrame, NDArray + from .model import Model @@ -15,29 +14,43 @@ class WeibullModel(Model): param_names = ("b", "k") default_param_specs = {"b": {"inv_link": "exp"}, "k": {"inv_link": "exp"}} - def attach_df(self, df: pd.DataFrame): + def attach_df(self, df: DataFrame): super().attach_df(df) if not all(self.y > 0): raise ValueError("Weibull model requires observations to be positive.") - def nll(self, params: List[ndarray]) -> ndarray: + def nll(self, params: list[NDArray]) -> NDArray: t = self.y ln_t = np.log(t) - return params[0]*(t**params[1]) - (params[1] - 1)*ln_t - np.log(params[0]) - np.log(params[1]) - - def dnll(self, params: List[ndarray]) -> List[ndarray]: + return ( + params[0] * (t ** params[1]) + - (params[1] - 1) * ln_t + - np.log(params[0]) + - np.log(params[1]) + ) + + def dnll(self, params: list[NDArray]) -> list[NDArray]: t = self.y ln_t = np.log(t) - return [t**params[1] - 1/params[0], - ln_t*params[0]*(t**params[1]) - ln_t - 1/params[1]] + return [ + t ** params[1] - 1 / params[0], + ln_t * params[0] * (t ** params[1]) - ln_t - 1 / params[1], + ] - def d2nll(self, params: List[ndarray]) -> List[List[ndarray]]: + def d2nll(self, params: list[NDArray]) -> list[list[NDArray]]: t = self.y ln_t = np.log(t) - return [[1/params[0]**2, ln_t*(t**params[1])], - [ln_t*(t**params[1]), 1/params[1]**2 + params[0]*(ln_t**2)*(t**params[1])]] - - def get_ui(self, params: List[ndarray], bounds: Tuple[float, float]) -> ndarray: - scale = 1 / params[0]**(1 / params[1]) - return [weibull_min.ppf(bounds[0], c=params[1], scale=scale), - weibull_min.ppf(bounds[1], c=params[1], scale=scale)] + return [ + [1 / params[0] ** 2, ln_t * (t ** params[1])], + [ + ln_t * (t ** params[1]), + 1 / params[1] ** 2 + params[0] * (ln_t**2) * (t ** params[1]), + ], + ] + + def get_ui(self, params: list[NDArray], bounds: tuple[float, float]) -> NDArray: + scale = 1 / params[0] ** (1 / params[1]) + return [ + weibull_min.ppf(bounds[0], c=params[1], scale=scale), + weibull_min.ppf(bounds[1], c=params[1], scale=scale), + ] diff --git a/src/regmod/optimizer.py b/src/regmod/optimizer.py index 382e3e3..a96886f 100644 --- a/src/regmod/optimizer.py +++ b/src/regmod/optimizer.py @@ -1,18 +1,18 @@ """ Optimizer module """ -from typing import Callable, Dict, Optional import numpy as np from msca.optim.prox import proj_capped_simplex from msca.optim.solver import IPSolver, NTSolver -from numpy.typing import NDArray from scipy.optimize import LinearConstraint, minimize +from regmod._typing import Callable, NDArray -def scipy_optimize(model: "Model", - x0: Optional[NDArray] = None, - options: Optional[Dict] = None) -> NDArray: + +def scipy_optimize( + model: "Model", x0: NDArray | None = None, options: dict | None = None +) -> NDArray: """Scipy trust-region optimizer. Parameters @@ -22,29 +22,37 @@ def scipy_optimize(model: "Model", x0 : NDArray, optional Initial guess for the variable, by default None. If `None` use zero vector as the initial guess. - options : Dict, optional + options : dict, optional Scipy solver options, by default None. Returns ------- NDArray Optimal solution. + """ x0 = np.zeros(model.size) if x0 is None else x0 bounds = model.uvec.T - constraints = [LinearConstraint( - model.linear_umat, - model.linear_uvec[0], - model.linear_uvec[1] - )] if model.linear_uvec.size > 0 else [] - - result = minimize(model.objective, x0, - method="trust-constr", - jac=model.gradient, - hess=model.hessian, - constraints=constraints, - bounds=bounds, - options=options) + constraints = ( + [ + LinearConstraint( + model.linear_umat, model.linear_uvec[0], model.linear_uvec[1] + ) + ] + if model.linear_uvec.size > 0 + else [] + ) + + result = minimize( + model.objective, + x0, + method="trust-constr", + jac=model.gradient, + hess=model.hessian, + constraints=constraints, + bounds=bounds, + options=options, + ) model.opt_result = result model.opt_coefs = result.x.copy() @@ -52,25 +60,17 @@ def scipy_optimize(model: "Model", return result.x -def msca_optimize(model: "Model", - x0: Optional[NDArray] = None, - options: Optional[Dict] = None) -> NDArray: +def msca_optimize( + model: "Model", x0: NDArray | None = None, options: dict | None = None +) -> NDArray: x0 = np.zeros(model.size) if x0 is None else x0 options = options or {} if model.cmat.size == 0: - solver = NTSolver( - model.objective, - model.gradient, - model.hessian - ) + solver = NTSolver(model.objective, model.gradient, model.hessian) else: solver = IPSolver( - model.objective, - model.gradient, - model.hessian, - model.cmat, - model.cvec + model.objective, model.gradient, model.hessian, model.cmat, model.cvec ) result = solver.minimize(x0=x0, **options) model.opt_result = result @@ -90,6 +90,7 @@ def set_trim_weights(model: "Model", index: NDArray, mask: float): Index where the weights need to be set. mask : float Value of the weights to set. + """ weights = np.ones(model.df.num_obs) weights[index] = mask @@ -108,19 +109,23 @@ def trimming(optimize: Callable) -> Callable: ------- Callable Trimming optimization solver. + """ - def optimize_with_trimming(model: "Model", - x0: NDArray = None, - options: Dict = None, - trim_steps: int = 3, - inlier_pct: float = 0.95) -> NDArray: + + def optimize_with_trimming( + model: "Model", + x0: NDArray = None, + options: dict = None, + trim_steps: int = 3, + inlier_pct: float = 0.95, + ) -> NDArray: if trim_steps < 2: raise ValueError("At least two trimming steps.") if inlier_pct < 0.0 or inlier_pct > 1.0: raise ValueError("inlier_pct has to be between 0 and 1.") coefs = optimize(model, x0, options) if inlier_pct < 1.0: - bounds = (0.5 - 0.5*inlier_pct, 0.5 + 0.5*inlier_pct) + bounds = (0.5 - 0.5 * inlier_pct, 0.5 + 0.5 * inlier_pct) index = model.detect_outliers(coefs, bounds) if index.sum() > 0: masks = np.append(np.linspace(1.0, 0.0, trim_steps)[1:], 0.0) @@ -129,6 +134,7 @@ def optimize_with_trimming(model: "Model", coefs = optimize(model, coefs, options) index = model.detect_outliers(coefs, bounds) return coefs + return optimize_with_trimming @@ -136,10 +142,10 @@ def original_trimming(optimize: Callable) -> Callable: def optimize_with_trimming( model: "Model", x0: NDArray = None, - options: Optional[Dict] = None, + options: dict | None = None, trim_steps: int = 10, step_size: float = 10.0, - inlier_pct: float = 0.95 + inlier_pct: float = 0.95, ) -> NDArray: if trim_steps < 2: raise ValueError("At least two trimming steps.") @@ -147,24 +153,24 @@ def optimize_with_trimming( raise ValueError("inlier_pct has to be between 0 and 1.") coefs = optimize(model, x0, options) if inlier_pct < 1.0: - num_inliers = int(inlier_pct*model.y.size) + num_inliers = int(inlier_pct * model.y.size) counter = 0 success = False while (counter < trim_steps) and (not success): counter += 1 nll_terms = model.get_nll_terms(coefs) model.trim_weights = proj_capped_simplex( - model.trim_weights - step_size*nll_terms, - num_inliers + model.trim_weights - step_size * nll_terms, num_inliers ) coefs = optimize(model, x0, options) success = all( - np.isclose(model.trim_weights, 0.0) | - np.isclose(model.trim_weights, 1.0) + np.isclose(model.trim_weights, 0.0) + | np.isclose(model.trim_weights, 1.0) ) if not success: sort_indices = np.argsort(model.trim_weights) model.trim_weights[sort_indices[-num_inliers:]] = 1.0 model.trim_weights[sort_indices[:-num_inliers]] = 0.0 return coefs + return optimize_with_trimming diff --git a/src/regmod/parameter.py b/src/regmod/parameter.py index 29b68e2..8bb1968 100644 --- a/src/regmod/parameter.py +++ b/src/regmod/parameter.py @@ -1,13 +1,13 @@ """ Parameter module """ + from dataclasses import dataclass, field -from typing import Optional, Union import numpy as np -import pandas as pd from scipy.linalg import block_diag +from regmod._typing import DataFrame, NDArray from regmod.function import SmoothFunction, fun_dict from regmod.prior import LinearGaussianPrior, LinearUniformPrior from regmod.variable import SplineVariable, Variable @@ -40,36 +40,44 @@ class Parameter: name: str variables: list[Variable] = field(default_factory=list, repr=False) - inv_link: Union[str, SmoothFunction] = field(default="identity", repr=False) - offset: Optional[str] = field(default=None, repr=False) + inv_link: str | SmoothFunction = field(default="identity", repr=False) + offset: str | None = field(default=None, repr=False) linear_gpriors: list[LinearGaussianPrior] = field(default_factory=list, repr=False) linear_upriors: list[LinearUniformPrior] = field(default_factory=list, repr=False) def __post_init__(self): if isinstance(self.inv_link, str): self.inv_link = fun_dict[self.inv_link] - assert isinstance(self.inv_link, SmoothFunction), \ - "inv_link has to be an instance of SmoothFunction." - assert all([isinstance(prior, LinearGaussianPrior) for prior in self.linear_gpriors]), \ - "linear_gpriors has to be a list of LinearGaussianPrior." - assert all([isinstance(prior, LinearUniformPrior) for prior in self.linear_upriors]), \ - "linear_upriors has to be a list of LinearUniformPrior." - assert all([prior.mat.shape[1] == self.size for prior in self.linear_gpriors]), \ - "linear_gpriors size not match." - assert all([prior.mat.shape[1] == self.size for prior in self.linear_upriors]), \ - "linear_upriors size not match." - assert all([isinstance(var, Variable) for var in self.variables]), \ - "variables has to be a list of Variable." + assert isinstance( + self.inv_link, SmoothFunction + ), "inv_link has to be an instance of SmoothFunction." + assert all( + [isinstance(prior, LinearGaussianPrior) for prior in self.linear_gpriors] + ), "linear_gpriors has to be a list of LinearGaussianPrior." + assert all( + [isinstance(prior, LinearUniformPrior) for prior in self.linear_upriors] + ), "linear_upriors has to be a list of LinearUniformPrior." + assert all( + [prior.mat.shape[1] == self.size for prior in self.linear_gpriors] + ), "linear_gpriors size not match." + assert all( + [prior.mat.shape[1] == self.size for prior in self.linear_upriors] + ), "linear_upriors size not match." + assert all( + [isinstance(var, Variable) for var in self.variables] + ), "variables has to be a list of Variable." if len(self.variables) == 0 and self.offset is None: - raise ValueError("Please provide at least one variable when " - "parameter does not have an offset") + raise ValueError( + "Please provide at least one variable when " + "parameter does not have an offset" + ) @property def size(self) -> int: """Size of the parameter.""" return sum([var.size for var in self.variables]) - def check_data(self, df: pd.DataFrame): + def check_data(self, df: DataFrame): """Attach data to all variables. Parameters @@ -81,7 +89,7 @@ def check_data(self, df: pd.DataFrame): for var in self.variables: var.check_data(df) - def get_mat(self, df: pd.DataFrame) -> np.ndarray: + def get_mat(self, df: DataFrame) -> NDArray: """Get the design matrix. Parameters @@ -91,7 +99,7 @@ def get_mat(self, df: pd.DataFrame) -> np.ndarray: Returns ------- - np.ndarray + NDArray The design matrix. """ @@ -99,125 +107,161 @@ def get_mat(self, df: pd.DataFrame) -> np.ndarray: return np.empty(shape=(df.shape[0], 0)) return np.hstack([var.get_mat(df) for var in self.variables]) - def get_uvec(self) -> np.ndarray: + def get_uvec(self) -> NDArray: """Get the direct Uniform prior. Returns ------- - np.ndarray + NDArray Uniform prior information array. + """ if len(self.variables) == 0: return np.empty(shape=(2, 0)) return np.hstack([var.get_uvec() for var in self.variables]) - def get_gvec(self) -> np.ndarray: + def get_gvec(self) -> NDArray: """Get the direct Gaussian prior. Returns ------- - np.ndarray + NDArray Gaussian prior information array. + """ if len(self.variables) == 0: return np.empty(shape=(2, 0)) return np.hstack([var.get_gvec() for var in self.variables]) - def get_linear_uvec(self) -> np.ndarray: + def get_linear_uvec(self) -> NDArray: """Get the linear Uniform prior vector. Returns ------- - np.ndarray + NDArray Uniform prior information array. + """ if len(self.variables) == 0: return np.empty(shape=(2, 0)) - uvec = np.hstack([ - var.get_linear_uvec() if isinstance(var, SplineVariable) else np.empty((2, 0)) - for var in self.variables - ]) + uvec = np.hstack( + [ + ( + var.get_linear_uvec() + if isinstance(var, SplineVariable) + else np.empty((2, 0)) + ) + for var in self.variables + ] + ) if len(self.linear_upriors) > 0: - uvec = np.hstack([uvec] + [np.vstack([prior.lb, prior.ub]) - for prior in self.linear_upriors]) + uvec = np.hstack( + [uvec] + + [np.vstack([prior.lb, prior.ub]) for prior in self.linear_upriors] + ) return uvec - def get_linear_gvec(self) -> np.ndarray: + def get_linear_gvec(self) -> NDArray: """Get the linear Gaussian prior vector. Returns ------- - np.ndarray + NDArray Gaussian prior information array. + """ if len(self.variables) == 0: return np.empty(shape=(2, 0)) - gvec = np.hstack([ - var.get_linear_gvec() if isinstance(var, SplineVariable) else np.empty((2, 0)) - for var in self.variables - ]) + gvec = np.hstack( + [ + ( + var.get_linear_gvec() + if isinstance(var, SplineVariable) + else np.empty((2, 0)) + ) + for var in self.variables + ] + ) if len(self.linear_gpriors) > 0: - gvec = np.hstack([gvec] + [np.vstack([prior.mean, prior.sd]) - for prior in self.linear_gpriors]) + gvec = np.hstack( + [gvec] + + [np.vstack([prior.mean, prior.sd]) for prior in self.linear_gpriors] + ) return gvec - def get_linear_umat(self) -> np.ndarray: + def get_linear_umat(self) -> NDArray: """Get the linear Uniform prior matrix. Returns ------- - np.ndarray + NDArray Uniform prior design matrix. + """ if len(self.variables) == 0: return np.empty(shape=(0, 0)) - umat = block_diag(*[ - var.get_linear_umat() if isinstance(var, SplineVariable) else np.empty((0, 1)) - for var in self.variables - ]) + umat = block_diag( + *[ + ( + var.get_linear_umat() + if isinstance(var, SplineVariable) + else np.empty((0, 1)) + ) + for var in self.variables + ] + ) if len(self.linear_upriors) > 0: umat = np.vstack([umat] + [prior.mat for prior in self.linear_upriors]) return umat - def get_linear_gmat(self) -> np.ndarray: + def get_linear_gmat(self) -> NDArray: """Get the linear Gaussian prior matrix. Returns ------- - np.ndarray + NDArray Gaussian prior design matrix. + """ if len(self.variables) == 0: return np.empty(shape=(0, 0)) - gmat = block_diag(*[ - var.get_linear_gmat() if isinstance(var, SplineVariable) else np.empty((0, 1)) - for var in self.variables - ]) + gmat = block_diag( + *[ + ( + var.get_linear_gmat() + if isinstance(var, SplineVariable) + else np.empty((0, 1)) + ) + for var in self.variables + ] + ) if len(self.linear_gpriors) > 0: gmat = np.vstack([gmat] + [prior.mat for prior in self.linear_gpriors]) return gmat - def get_lin_param(self, - coefs: np.ndarray, - df: pd.DataFrame, - mat: np.ndarray = None, - return_mat: bool = False) -> Union[np.ndarray, tuple[np.ndarray, np.ndarray]]: + def get_lin_param( + self, + coefs: NDArray, + df: DataFrame, + mat: NDArray = None, + return_mat: bool = False, + ) -> NDArray | tuple[NDArray, NDArray]: """Get the parameter before apply the link function. Parameters ---------- - coefs : np.ndarray + coefs : NDArray Coefficients for the design matrix. df Data frame that contains all the covariates. - mat : np.ndarray, optional + mat : NDArray, optional Alternative design matrix, by default None. return_mat : bool, optional If `True`, return the created design matrix, by default False. Returns ------- - Union[np.ndarray, tuple[np.ndarray, np.ndarray]] + NDArray | tuple[NDArray, NDArray] Linear parameter vector, or when `return_mat=True` also returns the design matrix. @@ -233,75 +277,73 @@ def get_lin_param(self, return lin_param, mat return lin_param - def get_param(self, - coefs: np.ndarray, - df: pd.DataFrame, - mat: np.ndarray = None) -> np.ndarray: + def get_param(self, coefs: NDArray, df: DataFrame, mat: NDArray = None) -> NDArray: """Get the parameter. Parameters ---------- - coefs : np.ndarray + coefs : NDArray Coefficients for the design matrix. df Data frame that contains all the covariates. - mat : np.ndarray, optional + mat : NDArray, optional Alternative design matrix, by default None. Returns ------- - np.ndarray + NDArray Returns the parameter. + """ lin_param = self.get_lin_param(coefs, df, mat) return self.inv_link.fun(lin_param) - def get_dparam(self, - coefs: np.ndarray, - df: pd.DataFrame, - mat: np.ndarray = None) -> np.ndarray: + def get_dparam(self, coefs: NDArray, df: DataFrame, mat: NDArray = None) -> NDArray: """Get the derivative of the parameter. Parameters ---------- - coefs : np.ndarray + coefs : NDArray Coefficients for the design matrix. df Data frame that contains all the covariates. - mat : np.ndarray, optional + mat : NDArray, optional Alternative design matrix, by default None. Returns ------- - np.ndarray + NDArray Returns the derivative of the parameter. + """ if len(self.variables) == 0: return np.empty((df.shape[0], 0)) lin_param, mat = self.get_lin_param(coefs, df, mat, return_mat=True) - return self.inv_link.dfun(lin_param)[:, None]*mat + return self.inv_link.dfun(lin_param)[:, None] * mat - def get_d2param(self, - coefs: np.ndarray, - df: pd.DataFrame, - mat: np.ndarray = None) -> np.ndarray: + def get_d2param( + self, coefs: NDArray, df: DataFrame, mat: NDArray = None + ) -> NDArray: """Get the second order derivative of the parameter. Parameters ---------- - coefs : np.ndarray + coefs : NDArray Coefficients for the design matrix. df Data frame that contains all the covariates. - mat : np.ndarray, optional + mat : NDArray, optional Alternative design matrix, by default None. Returns ------- - np.ndarray + NDArray Returns the second order derivative of the parameter. + """ if len(self.variables) == 0: return np.empty((df.shape[0], 0, 0)) lin_param, mat = self.get_lin_param(coefs, df, mat, return_mat=True) - return self.inv_link.d2fun(lin_param)[:, None, None]*(mat[..., None]*mat[:, None, :]) + return self.inv_link.d2fun(lin_param)[:, None, None] * ( + mat[..., None] * mat[:, None, :] + ) diff --git a/src/regmod/prior.py b/src/regmod/prior.py index ea88fb1..4e043f0 100644 --- a/src/regmod/prior.py +++ b/src/regmod/prior.py @@ -1,14 +1,13 @@ """ Prior module """ -from collections.abc import Iterable + from dataclasses import dataclass, field -from typing import Any, List import numpy as np from xspline import XSpline -from regmod.utils import default_vec_factory +from regmod._typing import Any, Iterable, NDArray @dataclass @@ -18,7 +17,7 @@ class Prior: Parameters ---------- - size : Optional[int], optional + size : int, optional Size of variable. Default is `None`. When it is `None`, size is inferred from the vector information of the prior. @@ -36,16 +35,17 @@ class Prior: ----- We should figure out a better structure for linear and spline prior, so that the extensions will be easier. + """ size: int = None - def process_size(self, vecs: List[Any]): + def process_size(self, vecs: list[Any]): """Infer and validate size from given vector information. Parameters ---------- - vecs : List[Any] + vecs : list[Any] Vector information of the prior. For Gaussian prior it will be mean and standard deviation. For Uniform prior it will be lower and upper bounds. @@ -54,6 +54,7 @@ def process_size(self, vecs: List[Any]): ------ ValueError Raised when size is not positive or integer. + """ if self.size is None: sizes = [len(vec) for vec in vecs if isinstance(vec, Iterable)] @@ -73,10 +74,10 @@ class GaussianPrior(Prior): size : Optional[int], optional Size of variable. Default is `None`. When it is `None`, size is inferred from the vector information of the prior. - mean : Union[float, np.ndarray], default=0 + mean : Union[float, NDArray], default=0 Mean of the Gaussian prior. Default is 0. If it is a scalar, it will be extended to an array with `self.size`. - sd : Union[float, np.ndarray], default=np.inf + sd : Union[float, NDArray], default=np.inf Standard deviation of the Gaussian prior. Default is `np.inf`. If it is a scalar, it will be extended to an array with `self.size`. @@ -84,9 +85,9 @@ class GaussianPrior(Prior): ---------- size : int Size of the variable. - mean : ndarray + mean : NDArray Mean of the Gaussian prior. - sd : ndarray + sd : NDArray Standard deviation of the Gaussian prior. Raises @@ -97,10 +98,11 @@ class GaussianPrior(Prior): Raised when size of the standard deviation vector doesn't match. ValueError Raised when any value in standard deviation vector is non-positive. + """ - mean: np.ndarray = field(default=0.0, repr=False) - sd: np.ndarray = field(default=np.inf, repr=False) + mean: NDArray = field(default=0.0, repr=False) + sd: NDArray = field(default=np.inf, repr=False) def __post_init__(self): self.process_size([self.mean, self.sd]) @@ -127,10 +129,10 @@ class UniformPrior(Prior): size : Optional[int], optional Size of variable. Default is `None`. When it is `None`, size is inferred from the vector information of the prior. - lb : Union[float, np.ndarray], default=-np.inf + lb : Union[float, NDArray], default=-np.inf Lower bound of Uniform prior. Default is `-np.inf`. If it is a scalar, it will be extended to an array with `self.size`. - ub : Union[float, np.ndarray], default=np.inf + ub : Union[float, NDArray], default=np.inf Upper bound of the Uniform prior. Default is `np.inf`. If it is a scalar,it will be extended to an array with `self.size`. @@ -138,9 +140,9 @@ class UniformPrior(Prior): ---------- size : int Size of the variable. - lb : ndarray + lb : NDArray Lower bound of Uniform prior. - ub : ndarray + ub : NDArray Upper bound of Uniform prior. Raises @@ -151,10 +153,11 @@ class UniformPrior(Prior): Raised when size of the upper bound vector doesn't match. ValueError Raised if lower bound is greater than upper bound. + """ - lb: np.ndarray = field(default=-np.inf, repr=False) - ub: np.ndarray = field(default=np.inf, repr=False) + lb: NDArray = field(default=-np.inf, repr=False) + ub: NDArray = field(default=np.inf, repr=False) def __post_init__(self): self.process_size([self.lb, self.ub]) @@ -169,9 +172,7 @@ def __post_init__(self): if self.ub.size != self.size: raise ValueError("Upper bound vector size does not match.") if any(self.lb > self.ub): - ValueError( - "Lower bounds must be less than or equal to upper bounds." - ) + ValueError("Lower bounds must be less than or equal to upper bounds.") @dataclass @@ -180,7 +181,7 @@ class LinearPrior: Parameters ---------- - mat : np.ndarray, optional + mat : NDArray, optional Linear mapping for the prior. Default is an empty matrix. size : Optional[int], optional Size of the prior. Default is `None`. If it is `None`, the size will @@ -188,7 +189,7 @@ class LinearPrior: Attributes ---------- - mat : np.ndarray + mat : NDArray Linear mapping for the prior. size : int Size of the prior. @@ -197,10 +198,10 @@ class LinearPrior: ------- is_empty() Indicate if the prior is empty. + """ - mat: np.ndarray = field(default_factory=lambda: np.empty(shape=(0, 1)), - repr=False) + mat: NDArray = field(default_factory=lambda: np.empty(shape=(0, 1)), repr=False) size: int = None def __post_init__(self): @@ -261,6 +262,7 @@ class SplinePrior(LinearPrior): ------- attach_spline(spline) Attach the spline to process the domain. + """ size: int = 100 @@ -270,13 +272,14 @@ class SplinePrior(LinearPrior): domain_type: str = field(default="rel", repr=False) def __post_init__(self): - assert self.domain_lb <= self.domain_ub, \ - "Domain lower bound must be less or equal than upper bound." - assert self.domain_type in ["rel", "abs"], \ - "Domain type must be 'rel' or 'abs'." + assert ( + self.domain_lb <= self.domain_ub + ), "Domain lower bound must be less or equal than upper bound." + assert self.domain_type in ["rel", "abs"], "Domain type must be 'rel' or 'abs'." if self.domain_type == "rel": - assert self.domain_lb >= 0.0 and self.domain_ub <= 1.0, \ - "Using relative domain, bounds must be numbers between 0 and 1." + assert ( + self.domain_lb >= 0.0 and self.domain_ub <= 1.0 + ), "Using relative domain, bounds must be numbers between 0 and 1." def attach_spline(self, spline: XSpline): """Attach the spline to process the domain. @@ -285,18 +288,20 @@ def attach_spline(self, spline: XSpline): ---------- spline : XSpline Spline used to create the linear mapping for the prior. + """ knots_lb = spline.knots[0] knots_ub = spline.knots[-1] if self.domain_type == "rel": - points_lb = knots_lb + (knots_ub - knots_lb)*self.domain_lb - points_ub = knots_lb + (knots_ub - knots_lb)*self.domain_ub + points_lb = knots_lb + (knots_ub - knots_lb) * self.domain_lb + points_ub = knots_lb + (knots_ub - knots_lb) * self.domain_ub else: points_lb = self.domain_lb points_ub = self.domain_ub points = np.linspace(points_lb, points_ub, self.size) - self.mat = spline.design_dmat(points, order=self.order, - l_extra=True, r_extra=True) + self.mat = spline.design_dmat( + points, order=self.order, l_extra=True, r_extra=True + ) super().__post_init__() diff --git a/src/regmod/utils.py b/src/regmod/utils.py index 1a9875b..f01d3d3 100644 --- a/src/regmod/utils.py +++ b/src/regmod/utils.py @@ -1,17 +1,18 @@ """ Utility classes and functions """ + from dataclasses import dataclass -from typing import Any, List, Optional import numpy as np from xspline import XSpline +from regmod._typing import Any, NDArray + -def default_vec_factory(vec: Any, - size: int, - default_value: float, - vec_name: str = 'vec') -> np.ndarray: +def default_vec_factory( + vec: Any, size: int, default_value: float, vec_name: str = "vec" +) -> NDArray: """Validate or create the vector. Parameters @@ -30,12 +31,13 @@ def default_vec_factory(vec: Any, Returns ------- - np.ndarray + NDArray Created or validated vector. Notes ----- This function should be replaced in the next version. + """ if vec is None or np.isscalar(vec): @@ -46,12 +48,12 @@ def default_vec_factory(vec: Any, return vec -def check_size(vec: np.ndarray, size: int, vec_name: str = 'vec') -> None: +def check_size(vec: NDArray, size: int, vec_name: str = "vec") -> None: """Check the size of the vector. Parameters ---------- - vec : np.ndarray + vec : NDArray Vector to be validated. size : int The assumption size of the vector. @@ -66,6 +68,7 @@ def check_size(vec: np.ndarray, size: int, vec_name: str = 'vec') -> None: Notes ----- This function should be replaced in the next version. + """ assert len(vec) == size, f"{vec_name} must length {size}." @@ -76,7 +79,7 @@ class SplineSpecs: Attributes ---------- - knots : np.ndarray + knots : NDArray Knots placement of the spline. Depends on `knots_type` this will be used differently. degree : int, default=3 @@ -104,9 +107,10 @@ class SplineSpecs: ------- create_spline(vec) Create the spline from given vector as the data. + """ - knots: np.ndarray + knots: NDArray degree: int = 3 l_linear: bool = False r_linear: bool = False @@ -114,22 +118,26 @@ class SplineSpecs: knots_type: str = "abs" def __post_init__(self): - assert self.knots_type in ["abs", "rel_domain", "rel_freq"], \ - "Knots type must be one of 'abs', 'rel_domain' or 'rel_freq'." + assert self.knots_type in [ + "abs", + "rel_domain", + "rel_freq", + ], "Knots type must be one of 'abs', 'rel_domain' or 'rel_freq'." @property def num_spline_bases(self) -> int: """Number of the spline bases.""" - inner_knots = self.knots[int(self.l_linear): - len(self.knots) - int(self.r_linear)] + inner_knots = self.knots[ + int(self.l_linear) : len(self.knots) - int(self.r_linear) + ] return len(inner_knots) - 2 + self.degree + int(self.include_first_basis) - def create_spline(self, vec: Optional[np.ndarray] = None) -> XSpline: + def create_spline(self, vec: NDArray | None = None) -> XSpline: """Create spline from the given vector. Parameters ---------- - vec : Optional[np.ndarray], optional + vec : Optional[NDArray], optional Given vector as the data. Default to `None`. When it is `None` it requires `knots_type` to be `abs`. @@ -142,31 +150,36 @@ def create_spline(self, vec: Optional[np.ndarray] = None) -> XSpline: ------- XSpline Spline object. + """ if self.knots_type == "abs": knots = self.knots else: - assert vec is not None, \ - "Using relative knots, must provide a vector to finalize knots." + assert ( + vec is not None + ), "Using relative knots, must provide a vector to finalize knots." if self.knots_type == "rel_domain": lb = np.min(vec) ub = np.max(vec) - knots = lb + self.knots*(ub - lb) + knots = lb + self.knots * (ub - lb) else: knots = np.quantile(vec, self.knots) - return XSpline(knots, self.degree, - l_linear=self.l_linear, - r_linear=self.r_linear, - include_first_basis=self.include_first_basis) + return XSpline( + knots, + self.degree, + l_linear=self.l_linear, + r_linear=self.r_linear, + include_first_basis=self.include_first_basis, + ) -def sizes_to_slices(sizes: List[int]) -> List[slice]: +def sizes_to_slices(sizes: list[int]) -> list[slice]: """Convert a list of sizes to a list of slices. Parameters ---------- - sizes : List[int] + sizes : list[int] A list of positive integers representing sizes of the groups. Raises @@ -176,7 +189,7 @@ def sizes_to_slices(sizes: List[int]) -> List[slice]: Returns ------- - List[slice] + list[slice] A list of slices converted from sizes. Examples @@ -190,6 +203,7 @@ def sizes_to_slices(sizes: List[int]) -> List[slice]: [0, 1, 2, 3, 4, 5] >>> [x[s] for s in slices] [[0], [1, 2], [3, 4, 5]] + """ sizes = np.asarray(sizes).astype(int) if any(sizes < 0): diff --git a/src/regmod/variable.py b/src/regmod/variable.py index 5cbf2d6..00e0964 100644 --- a/src/regmod/variable.py +++ b/src/regmod/variable.py @@ -1,18 +1,25 @@ """ Variable module """ -from collections.abc import Iterable + from copy import deepcopy from dataclasses import dataclass, field -from typing import List, Optional, Union import numpy as np -import pandas as pd from xspline import XSpline -from regmod.prior import (GaussianPrior, LinearGaussianPrior, LinearPrior, - LinearUniformPrior, Prior, SplineGaussianPrior, - SplinePrior, SplineUniformPrior, UniformPrior) +from regmod._typing import DataFrame, Iterable, NDArray +from regmod.prior import ( + GaussianPrior, + LinearGaussianPrior, + LinearPrior, + LinearUniformPrior, + Prior, + SplineGaussianPrior, + SplinePrior, + SplineUniformPrior, + UniformPrior, +) from regmod.utils import SplineSpecs @@ -26,7 +33,7 @@ class Variable: ---------- name : str Name of the variable corresponding to the column name in the data frame. - priors : List[Prior], optional + priors : list[Prior], optional A list of priors for the variable. Default is an empty list. Attributes @@ -34,7 +41,7 @@ class Variable: size name : str Name of the variable corresponding to the column name in the data frame. - priors : List[Prior] + priors : list[Prior] A list of priors for the variable. gprior : GaussianPrior Direct Gaussian prior in `priors`. @@ -65,10 +72,11 @@ class Variable: Notes ----- In the future, this class will be combined with the SplineVariable. + """ name: str - priors: List[Prior] = field(default_factory=list, repr=False) + priors: list[Prior] = field(default_factory=list, repr=False) gprior: GaussianPrior = field(default=None, init=False, repr=False) uprior: UniformPrior = field(default=None, init=False, repr=False) @@ -86,6 +94,7 @@ def process_priors(self): Raised if direct Uniform prior size not match. ValueError Raised when any prior in the list is not an instance of Prior. + """ for prior in self.priors: if isinstance(prior, LinearPrior): @@ -94,18 +103,16 @@ def process_priors(self): if self.gprior is not None: self.priors.remove(self.gprior) self.gprior = prior - assert self.gprior.size == self.size, \ - "Gaussian prior size not match." + assert self.gprior.size == self.size, "Gaussian prior size not match." elif isinstance(prior, UniformPrior): if self.uprior is not None: self.priors.remove(self.uprior) self.uprior = prior - assert self.uprior.size == self.size, \ - "Uniform prior size not match." + assert self.uprior.size == self.size, "Uniform prior size not match." else: raise ValueError("Unknown prior type.") - def check_data(self, df: pd.DataFrame): + def check_data(self, df: DataFrame): """Check if the data contains the column name `name`. Parameters @@ -132,12 +139,12 @@ def reset_priors(self) -> None: self.gprior = None self.uprior = None - def add_priors(self, priors: Union[Prior, List[Prior]]) -> None: + def add_priors(self, priors: Prior | list[Prior]) -> None: """Add priors. Parameters ---------- - priors : Union[Prior, List[Prior]] + priors : Union[Prior, list[Prior]] Priors to be added. """ if not isinstance(priors, list): @@ -145,12 +152,12 @@ def add_priors(self, priors: Union[Prior, List[Prior]]) -> None: self.priors.extend(priors) self.process_priors() - def rm_priors(self, indices: Union[int, List[int], List[bool]]) -> None: + def rm_priors(self, indices: int | list[int] | list[bool]) -> None: """Remove priors. Parameters ---------- - indices : Union[int, List[int], List[bool]] + indices : Union[int, list[int], list[bool]] Indicies of the priors that need to be removed. Indicies come in the forms of integer, list of integers or list of booleans. When it is integer or list of integers, it requires the integer is within the @@ -166,25 +173,32 @@ def rm_priors(self, indices: Union[int, List[int], List[bool]]) -> None: AssertionError Raised when `indices` is list of booleans but with different length compare to `self.priors`. + """ if isinstance(indices, int): indices = [indices] else: - assert isinstance(indices, Iterable), \ - "Indies must be int, List[int], or List[bool]." - if all([not isinstance(index, bool) and isinstance(index, int) - for index in indices]): + assert isinstance( + indices, Iterable + ), "Indies must be int, list[int], or list[bool]." + if all( + [ + not isinstance(index, bool) and isinstance(index, int) + for index in indices + ] + ): indices = [i in indices for i in range(len(self.priors))] - assert all([isinstance(index, bool) for index in indices]), \ - "Index type not consistent." - assert len(indices) == len(self.priors), \ - "Index size not match with number of priors." - self.priors = [self.priors[i] for i, index in enumerate(indices) - if not index] + assert all( + [isinstance(index, bool) for index in indices] + ), "Index type not consistent." + assert len(indices) == len( + self.priors + ), "Index size not match with number of priors." + self.priors = [self.priors[i] for i, index in enumerate(indices) if not index] self.reset_priors() self.process_priors() - def get_mat(self, df: pd.DataFrame) -> np.ndarray: + def get_mat(self, df: DataFrame) -> NDArray: """Get design matrix. Parameters @@ -194,7 +208,7 @@ def get_mat(self, df: pd.DataFrame) -> np.ndarray: Returns ------- - np.ndarray + NDArray Design matrix. """ @@ -203,13 +217,14 @@ def get_mat(self, df: pd.DataFrame) -> np.ndarray: return np.ones((df.shape[0], 1)) return df[[self.name]].to_numpy() - def get_gvec(self) -> np.ndarray: + def get_gvec(self) -> NDArray: """Get direct Gaussian prior vector. Returns ------- - np.ndarray + NDArray Direct Gaussian prior vector. + """ if self.gprior is None: gvec = np.repeat([[0.0], [np.inf]], self.size, axis=1) @@ -217,13 +232,14 @@ def get_gvec(self) -> np.ndarray: gvec = np.vstack([self.gprior.mean, self.gprior.sd]) return gvec - def get_uvec(self) -> np.ndarray: + def get_uvec(self) -> NDArray: """Get direct Uniform prior vector. Returns ------- - np.ndarray + NDArray Direct Uniform prior vector. + """ if self.uprior is None: uvec = np.repeat([[-np.inf], [np.inf]], self.size, axis=1) @@ -238,6 +254,7 @@ def copy(self) -> "Variable": ------- Variable Current instance. + """ return deepcopy(self) @@ -254,10 +271,10 @@ class SplineVariable(Variable): spline_specs : SplineSpecs, optional Spline settings used to create spline object. Recommend to use only when use `knots_type={'rel_domain', 'rel_freq'}. Default to be `None`. - linear_gpriors : List[LinearPrior], optional + linear_gpriors : list[LinearPrior], optional A list of linear Gaussian priors usually for shape priors of the spline. Default to be an empty list. - linear_upriors : List[LinearPrior], optional + linear_upriors : list[LinearPrior], optional A list of linear Uniform priors usually for shape priors of the spline. spline. Default to be an empty list. @@ -267,9 +284,9 @@ class SplineVariable(Variable): Spline object that in charge of creating design matrix. spline_specs : SplineSpecs Spline settings used to create spline object. - linear_gpriors : List[LinearPrior] + linear_gpriors : list[LinearPrior] A list of linear Gaussian priors usually for shape priors of the spline. - linear_upriors : List[LinearPrior] + linear_upriors : list[LinearPrior] A list of linear Uniform priors usually for shape priors of the spline. Methods @@ -292,19 +309,20 @@ class SplineVariable(Variable): Get linear Gaussian prior design matrix. get_linear_umat(data) Get linear Uniform prior design matrix. + """ spline: XSpline = field(default=None, repr=False) spline_specs: SplineSpecs = field(default=None, repr=False) - linear_gpriors: List[LinearPrior] = field(default_factory=list, repr=False) - linear_upriors: List[LinearPrior] = field(default_factory=list, repr=False) + linear_gpriors: list[LinearPrior] = field(default_factory=list, repr=False) + linear_upriors: list[LinearPrior] = field(default_factory=list, repr=False) def __post_init__(self): if (self.spline is None) and (self.spline_specs is None): raise ValueError("At least one of spline and spline_specs is not None.") self.process_priors() - def check_data(self, df: pd.DataFrame): + def check_data(self, df: DataFrame): """Check if the data contains the column name `name`. And create the spline object, if only `spline_specs` is provided. @@ -334,6 +352,7 @@ def process_priors(self): Raised if direct Uniform prior size not match. ValueError Raised when any prior in the list is not an instance of Prior. + """ for prior in self.priors: if isinstance(prior, (SplineGaussianPrior, LinearGaussianPrior)): @@ -344,14 +363,12 @@ def process_priors(self): if self.gprior is not None: self.priors.remove(self.gprior) self.gprior = prior - assert self.gprior.size == self.size, \ - "Gaussian prior size not match." + assert self.gprior.size == self.size, "Gaussian prior size not match." elif isinstance(prior, UniformPrior): if self.uprior is not None: self.priors.remove(self.uprior) self.uprior = prior - assert self.uprior.size == self.size, \ - "Uniform prior size not match." + assert self.uprior.size == self.size, "Uniform prior size not match." else: raise ValueError("Unknown prior type.") @@ -371,7 +388,7 @@ def reset_priors(self): self.linear_gpriors = list() self.linear_upriors = list() - def get_mat(self, df: pd.DataFrame) -> np.ndarray: + def get_mat(self, df: DataFrame) -> NDArray: """Get design matrix. Parameters @@ -381,7 +398,7 @@ def get_mat(self, df: pd.DataFrame) -> np.ndarray: Returns ------- - np.ndarray + NDArray Design matrix. """ @@ -389,41 +406,41 @@ def get_mat(self, df: pd.DataFrame) -> np.ndarray: cov = df[self.name].to_numpy() return self.spline.design_mat(cov, l_extra=True, r_extra=True) - def get_linear_uvec(self) -> np.ndarray: + def get_linear_uvec(self) -> NDArray: """Get linear Uniform prior vector. Returns ------- - np.ndarray + NDArray Linear uniform prior vector. + """ if not self.linear_upriors: uvec = np.empty((2, 0)) else: - uvec = np.hstack([ - np.vstack([prior.lb, prior.ub]) - for prior in self.linear_upriors - ]) + uvec = np.hstack( + [np.vstack([prior.lb, prior.ub]) for prior in self.linear_upriors] + ) return uvec - def get_linear_gvec(self) -> np.ndarray: + def get_linear_gvec(self) -> NDArray: """Get linear Gaussian prior vector. Returns ------- - np.ndarray + NDArray Linear Gaussian prior vector. + """ if not self.linear_gpriors: gvec = np.empty((2, 0)) else: - gvec = np.hstack([ - np.vstack([prior.mean, prior.sd]) - for prior in self.linear_gpriors - ]) + gvec = np.hstack( + [np.vstack([prior.mean, prior.sd]) for prior in self.linear_gpriors] + ) return gvec - def get_linear_umat(self, df: Optional[pd.DataFrame] = None) -> np.ndarray: + def get_linear_umat(self, data: DataFrame | None = None) -> NDArray: """Get linear Uniform prior design matrix. Parameters @@ -438,8 +455,9 @@ def get_linear_umat(self, df: Optional[pd.DataFrame] = None) -> np.ndarray: Returns ------- - np.ndarray: + NDArray: Linear Uniform prior design matrix. + """ if not self.linear_upriors: umat = np.empty((0, self.size)) @@ -447,12 +465,10 @@ def get_linear_umat(self, df: Optional[pd.DataFrame] = None) -> np.ndarray: if self.spline is None: assert data is not None, "Must check data to create spline first." self.check_data(data) - umat = np.vstack([ - prior.mat for prior in self.linear_upriors - ]) + umat = np.vstack([prior.mat for prior in self.linear_upriors]) return umat - def get_linear_gmat(self, df: Optional[pd.DataFrame] = None) -> np.ndarray: + def get_linear_gmat(self, data: DataFrame | None = None) -> NDArray: """Get linear Gaussian prior design matrix. Parameters @@ -467,8 +483,9 @@ def get_linear_gmat(self, df: Optional[pd.DataFrame] = None) -> np.ndarray: Returns ------- - np.ndarray: + NDArray: Linear Gaussian prior design matrix. + """ if not self.linear_gpriors: gmat = np.empty((0, self.size)) @@ -476,7 +493,5 @@ def get_linear_gmat(self, df: Optional[pd.DataFrame] = None) -> np.ndarray: if self.spline is None: assert data is not None, "Must check data to create spline first." self.check_data(data) - gmat = np.vstack([ - prior.mat for prior in self.linear_gpriors - ]) + gmat = np.vstack([prior.mat for prior in self.linear_gpriors]) return gmat