Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test/unit tests pareto optimizer #1109

Open
wants to merge 6 commits into
base: robynpy_release
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions python/spec/src/robyn/modeling/pareto/pareto_optimizer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# CLASS
## ParetoResult
* Holds the results of Pareto optimization for marketing mix models.
* It is a data class that stores several attributes related to Pareto-optimal solutions.
* Attributes:
- `pareto_solutions (List[str])`: A list of solution IDs that are Pareto-optimal.
- `pareto_fronts (int)`: The number of Pareto fronts considered in the optimization.
- `result_hyp_param (pd.DataFrame)`: Hyperparameters of Pareto-optimal solutions.
- `x_decomp_agg (pd.DataFrame)`: Aggregated decomposition results for Pareto-optimal solutions.
- `result_calibration (Optional[pd.DataFrame])`: Calibration results, if calibration was performed.
- `media_vec_collect (pd.DataFrame)`: Collected media vectors for all Pareto-optimal solutions.
- `x_decomp_vec_collect (pd.DataFrame)`: Collected decomposition vectors for all Pareto-optimal solutions.
- `plot_data_collect (Dict[str, pd.DataFrame])`: Data for various plots, keyed by plot type.
- `df_caov_pct_all (pd.DataFrame)`: Carryover percentage data for all channels and Pareto-optimal solutions.

## ParetoData
* This class holds data necessary for Pareto optimization in marketing mix models.
* It is a data class that contains attributes for handling decomposition of spending and hyperparameters.
* Attributes:
- `decomp_spend_dist (pd.DataFrame)`: Decomposed spending distribution.
- `result_hyp_param (pd.DataFrame)`: Result hyperparameters.
- `x_decomp_agg (pd.DataFrame)`: Aggregated decomposition results.
- `pareto_fronts (List[int])`: List of Pareto fronts.

## ParetoOptimizer
* Performs Pareto optimization on marketing mix models.
* This class orchestrates the Pareto optimization process, including data aggregation, Pareto front calculation, response curve calculation, and plot data preparation.
* Attributes:
- `mmm_data (MMMData)`: Input data for the marketing mix model.
- `model_outputs (ModelOutputs)`: Output data from the model runs.
- `response_calculator (ResponseCurveCalculator)`: Calculator for response curves.
- `carryover_calculator (ImmediateCarryoverCalculator)`: Calculator for immediate and carryover effects.
- `pareto_utils (ParetoUtils)`: Utility functions for Pareto-related calculations.

# CONSTRUCTORS
## ParetoOptimizer `(mmm_data: MMMData, model_outputs: ModelOutputs, hyper_parameter: Hyperparameters, featurized_mmm_data: FeaturizedMMMData, holidays_data: HolidaysData)`
* Initializes the ParetoOptimizer with the necessary data for optimization.

### USAGE
* Use this constructor when you need to perform Pareto optimization on marketing mix models using the specified inputs.

### IMPL
* Initializes the following attributes:
- `self.mmm_data`: Stores the input marketing mix model data.
- `self.model_outputs`: Stores the output data from model runs.
- `self.hyper_parameter`: Stores the hyperparameters for the model runs.
- `self.featurized_mmm_data`: Stores the featurized marketing mix model data.
- `self.holidays_data`: Stores the holidays data.
- `self.transformer`: Initialized as a `Transformation` instance using `mmm_data`.

# METHODS
## `optimize(pareto_fronts: str = "auto", min_candidates: int = 100, calibration_constraint: float = 0.1, calibrated: bool = False) -> ParetoResult`
### USAGE
* Parameters:
- `pareto_fronts (str)`: Number of Pareto fronts to consider or "auto" for automatic selection.
- `min_candidates (int)`: Minimum number of candidates to consider when using "auto" Pareto fronts.
- `calibration_constraint (float)`: Constraint for calibration, used if models are calibrated.
- `calibrated (bool)`: Whether the models have undergone calibration.
* Use this method to perform the entire Pareto optimization process.

### IMPL
* Calls `_aggregate_model_data` to aggregate model data based on calibration status.
* Computes Pareto fronts using `_compute_pareto_fronts` with aggregated data and specified parameters.
* Prepares Pareto data using `prepare_pareto_data` with aggregated data and specified parameters.
* Computes response curves with `_compute_response_curves` using Pareto data and aggregated data.
* Generates plot data with `_generate_plot_data` using aggregated data and Pareto data.
* Returns a `ParetoResult` containing the optimization results.

## `_aggregate_model_data(calibrated: bool) -> Dict[str, pd.DataFrame]`
### USAGE
* Parameter:
- `calibrated (bool)`: Indicates whether calibration was performed.
* Aggregates and prepares data from model outputs for Pareto optimization.

### IMPL
* Extracts hyperparameters, decomposition results, and calibration data from model outputs.
* Concatenates data into DataFrames using `pd.concat`.
* Adds iteration numbers based on whether hyperparameters are fixed.
* Merges bootstrap results with `xDecompAgg` if available.
* Returns a dictionary containing aggregated data, including 'result_hyp_param', 'x_decomp_agg', and 'result_calibration'.

## `_compute_pareto_fronts(aggregated_data: Dict[str, pd.DataFrame], pareto_fronts: str, min_candidates: int, calibration_constraint: float) -> pd.DataFrame`
### USAGE
* Parameters:
- `aggregated_data (Dict[str, pd.DataFrame])`: Aggregated model data.
- `pareto_fronts (str)`: Number of Pareto fronts to compute or "auto".
- `min_candidates (int)`: Minimum number of candidates for automatic selection.
- `calibration_constraint (float)`: Calibration constraint.
* Calculates Pareto fronts from aggregated data.

### IMPL
* Filters and groups data to calculate coefficients and quantiles.
* Identifies Pareto-optimal solutions based on NRMSE, DECOMP.RSSD, and MAPE criteria.
* Assigns solutions to Pareto fronts.
* Computes combined weighted error scores using `ParetoUtils.calculate_errors_scores`.
* Returns a DataFrame of Pareto-optimal solutions with their corresponding front numbers.

## `prepare_pareto_data(aggregated_data: Dict[str, pd.DataFrame], pareto_fronts: str, min_candidates: int, calibrated: bool) -> ParetoData`
### USAGE
* Parameters:
- `aggregated_data (Dict[str, pd.DataFrame])`: Aggregated data for processing.
- `pareto_fronts (str)`: Pareto fronts configuration.
- `min_candidates (int)`: Minimum candidates for selection.
- `calibrated (bool)`: Indicates calibration status.
* Prepares data for Pareto optimization.

### IMPL
* Merges decomposed spend distribution with result hyperparameters.
* Handles automatic Pareto front selection and filtering based on configuration.
* Returns a `ParetoData` instance with filtered data for selected Pareto fronts.

## `run_dt_resp(row: pd.Series, paretoData: ParetoData) -> pd.Series`
### USAGE
* Parameters:
- `row (pd.Series)`: A row of Pareto data.
- `paretoData (ParetoData)`: Pareto data instance.
* Calculates response curves for a given row, used for parallel processing.

### IMPL
* Calculates response curves using `ResponseCurveCalculator`.
* Computes mean spend adstocked and carryover values.
* Returns computed values for response, spend, and carryover in a pandas Series.

## `_compute_response_curves(pareto_data: ParetoData, aggregated_data: Dict[str, pd.DataFrame]) -> ParetoData`
### USAGE
* Parameters:
- `pareto_data (ParetoData)`: Pareto data for processing.
- `aggregated_data (Dict[str, pd.DataFrame])`: Aggregated data for reference.
* Computes response curves for Pareto-optimal solutions.

### IMPL
* Utilizes parallel processing to compute response curves for media variables.
* Merges computed response curves into `ParetoData`.
* Calculates ROI and CPA metrics.
* Returns updated `ParetoData` with computed response curves.

## `_generate_plot_data(aggregated_data: Dict[str, pd.DataFrame], pareto_data: ParetoData) -> Dict[str, pd.DataFrame]`
### USAGE
* Parameters:
- `aggregated_data (Dict[str, pd.DataFrame])`: Aggregated data for plotting.
- `pareto_data (ParetoData)`: Pareto data for visualization.
* Prepares data for various plots used in Pareto analysis.

### IMPL
* Iterates over Pareto fronts, generating plot data for spend vs. effect share, waterfall plots, adstock rates, and spend response curves.
* Collects media vectors, decomposition vectors, and plot data into dictionaries.
* Returns a dictionary containing plot data for visualization.

## `robyn_immcarr(pareto_data: ParetoData, result_hyp_param: pd.DataFrame, solID=None, start_date=None, end_date=None)`
### USAGE
* Parameters:
- `pareto_data (ParetoData)`: Pareto data for analysis.
- `result_hyp_param (pd.DataFrame)`: Result hyperparameters.
- `solID`: Optional solution ID.
- `start_date`: Optional start date for analysis.
- `end_date`: Optional end date for analysis.
* Analyzes immediate and carryover response, calculating percentages.

### IMPL
* Extracts and processes hyperparameters for the specified solution.
* Runs transformations and decompositions on the data.
* Calculates media decomposition and carryover percentages.
* Returns a DataFrame with immediate and carryover response percentages.

## `_extract_hyperparameter(hypParamSam: pd.DataFrame) -> Hyperparameters`
### USAGE
* Parameter:
- `hypParamSam (pd.DataFrame)`: DataFrame containing hyperparameters.
* Extracts hyperparameters from the provided DataFrame.

### IMPL
* Iterates over media channels, extracting relevant hyperparameters such as alphas, gammas, thetas, shapes, and scales.
* Constructs and returns a `Hyperparameters` instance with extracted values.

## `_model_decomp(inputs) -> Dict[str, pd.DataFrame]`
### USAGE
* Parameter:
- `inputs (dict)`: Dictionary containing decomposition inputs.
* Decomposes model outputs into immediate and carryover responses.

### IMPL
* Extracts coefficients and performs decomposition on model data.
* Computes immediate and carryover responses using coefficients.
* Returns a dictionary with decomposition results, including `xDecompVec`, `mediaDecompImmediate`, and `mediaDecompCarryover`.
14 changes: 14 additions & 0 deletions python/spec/tests/modeling/pareto/test_pareto_optimizer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# CLASS
## ParetoOptimizer
* Performs Pareto optimization on marketing mix models.
* This class orchestrates the Pareto optimization process, including data aggregation, Pareto front calculation, response curve calculation, and plot data preparation.
* Attributes:
* `mmm_data (MMMData)`: Input data for the marketing mix model.
* `model_outputs (ModelOutputs)`: Output data from the model runs.
* `response_calculator (ResponseCurveCalculator)`: Calculator for response curves.
* `carryover_calculator (ImmediateCarryoverCalculator)`: Calculator for immediate and carryover effects.
* `pareto_utils (ParetoUtils)`: Utility functions for Pareto-related calculations.

# CONSTRUCTORS
## ParetoOptimizer `(mmm_data: MMMData, model_outputs: ModelOutputs, hyper_parameter: Hyperparameters, featurized_mmm_data: FeaturizedMMMData, holidays_data: HolidaysData)`
* Use this constructor when you need to perform Pareto optimization on marketing mix models using the specified inputs.
18 changes: 4 additions & 14 deletions python/src/robyn/modeling/pareto/pareto_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from robyn.modeling.entities.pareto_result import ParetoResult
from tqdm import tqdm


@dataclass
class ParetoData:
decomp_spend_dist: pd.DataFrame
Expand Down Expand Up @@ -237,7 +236,7 @@ def prepare_pareto_data(
for trial in chunk_trials:
if trial.decomp_spend_dist is not None:
# Select only necessary columns
required_cols = ["trial", "iterNG", "iterPar", "rn", "mean_spend", "total_spend", "xDecompAgg"]
required_cols = ["trial", "iterNG", "iterPar", "rn", "mean_spend", "total_spend", "xDecompAgg", "sol_id"]
trial_data = trial.decomp_spend_dist[
[col for col in required_cols if col in trial.decomp_spend_dist.columns]
]
Expand Down Expand Up @@ -274,6 +273,7 @@ def prepare_pareto_data(
self.logger.info("Using single Pareto front due to fixed hyperparameters or single model")

# Automatic Pareto front selection with memory optimization
grouped_data = None
if pareto_fronts == "auto":
n_pareto = result_hyp_param["robynPareto"].notna().sum()
self.logger.info(f"Number of Pareto-optimal solutions found: {n_pareto}")
Expand All @@ -291,7 +291,6 @@ def prepare_pareto_data(
.reset_index()
)
grouped_data["n_cum"] = grouped_data["n"].cumsum()

auto_pareto = grouped_data[grouped_data["n_cum"] >= scaled_min_candidates]

if len(auto_pareto) == 0:
Expand Down Expand Up @@ -341,13 +340,11 @@ def run_dt_resp(self, row: pd.Series, paretoData: ParetoData) -> Optional[dict]:
get_spendname = row["rn"]
startRW = self.mmm_data.mmmdata_spec.rolling_window_start_which
endRW = self.mmm_data.mmmdata_spec.rolling_window_end_which

response_calculator = ResponseCurveCalculator(
mmm_data=self.mmm_data,
model_outputs=self.model_outputs,
hyperparameter=self.hyper_parameter,
)

response_output: ResponseOutput = response_calculator.calculate_response(
select_model=get_sol_id,
metric_name=get_spendname,
Expand All @@ -356,7 +353,6 @@ def run_dt_resp(self, row: pd.Series, paretoData: ParetoData) -> Optional[dict]:
dt_coef=paretoData.x_decomp_agg,
quiet=True,
)

mean_spend_adstocked = np.mean(response_output.input_total[startRW:endRW])
mean_carryover = np.mean(response_output.input_carryover[startRW:endRW])

Expand All @@ -365,7 +361,6 @@ def run_dt_resp(self, row: pd.Series, paretoData: ParetoData) -> Optional[dict]:
dt_coef = paretoData.x_decomp_agg[
(paretoData.x_decomp_agg["sol_id"] == get_sol_id) & (paretoData.x_decomp_agg["rn"] == get_spendname)
][["rn", "coef"]]

hill_calculator = HillCalculator(
mmmdata=self.mmm_data,
model_outputs=self.model_outputs,
Expand All @@ -376,7 +371,6 @@ def run_dt_resp(self, row: pd.Series, paretoData: ParetoData) -> Optional[dict]:
chn_adstocked=chn_adstocked,
)
hills = hill_calculator.get_hill_params()

mean_response = ParetoUtils.calculate_fx_objective(
x=row["mean_spend"],
coeff=hills["coefs_sorted"][0],
Expand Down Expand Up @@ -719,7 +713,6 @@ def robyn_immcarr(
axis=1,
)
temp["sol_id"] = sol_id

vec_collect = {
"xDecompVec": temp.drop(
columns=temp.columns[temp.columns.str.endswith("_MDI") | temp.columns.str.endswith("_MDC")]
Expand All @@ -735,14 +728,12 @@ def robyn_immcarr(
]
),
}

this = vec_collect["xDecompVecImmediate"].columns.str.replace("_MDI", "", regex=False)
vec_collect["xDecompVecImmediate"].columns = this
vec_collect["xDecompVecCarryover"].columns = this

df_caov = (vec_collect["xDecompVecCarryover"].groupby("sol_id").sum().reset_index()).drop(columns="ds")
df_total = vec_collect["xDecompVec"].groupby("sol_id").sum().reset_index().drop(columns="ds")

df_caov = (vec_collect["xDecompVecCarryover"].drop(columns="ds").groupby("sol_id").sum().reset_index())
df_total = vec_collect["xDecompVec"].drop(columns="ds").groupby("sol_id").sum().reset_index()
df_caov_pct = df_caov.copy()
df_caov_pct.loc[:, df_caov_pct.columns[1:]] = df_caov_pct.loc[:, df_caov_pct.columns[1:]].div(
df_total.iloc[:, 1:].values
Expand Down Expand Up @@ -777,7 +768,6 @@ def robyn_immcarr(
["sol_id", "start_date", "end_date", "type"]
)["response"].transform("sum")
xDecompVecImmeCaov.fillna(0, inplace=True)

xDecompVecImmeCaov = xDecompVecImmeCaov.merge(df_caov_pct, on=["sol_id", "rn"], how="left")

return xDecompVecImmeCaov
Expand Down
16 changes: 7 additions & 9 deletions python/src/robyn/modeling/pareto/response_curve.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def calculate_response(
dt_hyppar: pd.DataFrame = pd.DataFrame(),
dt_coef: pd.DataFrame = pd.DataFrame(),
) -> ResponseOutput:

# Determine the use case based on input parameters
usecase = self._which_usecase(metric_value, date_range)

Expand All @@ -88,7 +89,6 @@ def calculate_response(
val_list = self._check_metric_value(
metric_value, metric_name, all_values, ds_list.metric_loc
)

date_range_updated = ds_list.date_range_updated
metric_value_updated = val_list.metric_value_updated
all_values_updated = val_list.all_values_updated
Expand Down Expand Up @@ -135,7 +135,6 @@ def calculate_response(
m_adstockedRW = x_list.x_decayed[
self.mmm_data.mmmdata_spec.rolling_window_start_which : self.mmm_data.mmmdata_spec.rolling_window_end_which
]

if usecase == UseCase.ALL_HISTORICAL_VEC:
metric_saturated_total = self.transformation.saturation_hill(
m_adstockedRW, hill_params.alphas[0], hill_params.gammas[0]
Expand All @@ -156,12 +155,11 @@ def calculate_response(
hill_params.gammas[0],
x_marginal=input_carryover,
)

metric_saturated_immediate = metric_saturated_total - metric_saturated_carryover

# Calculate final response values
coeff = dt_coef[
(dt_coef["solID"] == select_model) & (dt_coef["rn"] == hpm_name)
(dt_coef["sol_id"] == select_model) & (dt_coef["rn"] == hpm_name)
]["coef"].values[0]
m_saturated = self.transformation.saturation_hill(
m_adstockedRW, hill_params.alphas[0], hill_params.gammas[0]
Expand Down Expand Up @@ -318,18 +316,18 @@ def _get_channel_hyperparams(
params = ChannelHyperparameters()

if adstock_type == AdstockType.GEOMETRIC:
params.thetas = dt_hyppar[dt_hyppar["solID"] == select_model][
params.thetas = dt_hyppar[dt_hyppar["sol_id"] == select_model][
f"{hpm_name}_thetas"
].values
elif adstock_type in [
AdstockType.WEIBULL,
AdstockType.WEIBULL_CDF,
AdstockType.WEIBULL_PDF,
]:
params.shapes = dt_hyppar[dt_hyppar["solID"] == select_model][
params.shapes = dt_hyppar[dt_hyppar["sol_id"] == select_model][
f"{hpm_name}_shapes"
].values
params.scales = dt_hyppar[dt_hyppar["solID"] == select_model][
params.scales = dt_hyppar[dt_hyppar["sol_id"] == select_model][
f"{hpm_name}_scales"
].values

Expand All @@ -339,10 +337,10 @@ def _get_saturation_params(
self, select_model: str, hpm_name: str, dt_hyppar: pd.DataFrame
) -> ChannelHyperparameters:
params = ChannelHyperparameters()
params.alphas = dt_hyppar[dt_hyppar["solID"] == select_model][
params.alphas = dt_hyppar[dt_hyppar["sol_id"] == select_model][
f"{hpm_name}_alphas"
].values
params.gammas = dt_hyppar[dt_hyppar["solID"] == select_model][
params.gammas = dt_hyppar[dt_hyppar["sol_id"] == select_model][
f"{hpm_name}_gammas"
].values
return params
Expand Down
Loading
Loading