Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Analyzer - Generate baseline given results from multiple nodes. #573

Closed
wants to merge 13 commits into from
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,12 @@ def run(self):
'matplotlib>=3.0.0',
'natsort>=7.1.1',
'networkx>=2.5',
'numpy>=1.19.2',
'numpy>=1.20.3',
'omegaconf==2.0.6',
'openpyxl>=3.0.7',
'pandas>=1.1.5',
'pssh @ git+https://github.com/lilydjwg/[email protected]',
'python-dateutil>=2.8.2'
'pyyaml>=5.3',
'requests>=2.27.1',
'seaborn>=0.11.2',
Expand Down
2 changes: 1 addition & 1 deletion superbench/analyzer/file_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def read_raw_data(raw_data_path):
raw_data_df = raw_data_df.rename(raw_data_df['node'])
raw_data_df = raw_data_df.drop(columns=['node'])
except Exception as e:
logger.log_and_raise(exception=IOError, msg='Analyzer: invalid raw data fomat - {}'.format(str(e)))
logger.log_and_raise(exception=IOError, msg='Analyzer: invalid raw data format - {}'.format(str(e)))
return raw_data_df


Expand Down
248 changes: 248 additions & 0 deletions superbench/analyzer/generate_baseline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
# Copyright (c) Microsoft Corporation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you please add a document for this function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you please add test for this function

# Licensed under the MIT license.

"""A module for baseline generation."""

import argparse
from copy import deepcopy
import json
import re

from joblib import Parallel, delayed
import pandas as pd

from superbench.common.utils import logger
from superbench.analyzer import file_handler
from superbench.analyzer import data_analysis
from superbench.analyzer import DataDiagnosis
from superbench.analyzer import ResultSummary
from superbench.analyzer.diagnosis_rule_op import RuleOp, DiagnosisRuleType
from superbench.benchmarks.context import Enum


class BaselineAlgoType(Enum):
"""The Enum class representing different baseline generation algorithm."""

MEAN = 'mean'
FIX_THRESHOLD = 'fix_threshold'


class GenerateBaseline(DataDiagnosis):
"""The class to generate baseline for raw data."""

def fix_threshold_outlier_detection(self, data_series, single_metric_with_baseline, metric, rule_op):
"""Fix threshold outlier detection algorithm.

Step 0: Put all data in the collection
Step 1: Regenerate the collection
Calculate the average number in the collection as the baseline
Remove all data which cannot pass the fix threshold based on the new baseline
Step 2: If no data has been removed from Step 1, go to Step 3; otherwise, go to Step 1
Step 3: Use the baseline and fix threshold for Outlier Detection

Args:
data_series (pd.Series): data of the metric
single_metric_with_baseline (dict): baseline of the single metric in 'metrics' in 2-layer dict format
metric (str): the name of the metric to execute the algorithm
rule_op (function): diagnosis rule op function

Returns:
tuple: the baseline of the metric, normal data of the metric
"""
if single_metric_with_baseline['metrics'][metric] != None and single_metric_with_baseline['metrics'][metric
] != -1:
return single_metric_with_baseline['metrics'][metric]
tmp_single_metric_with_baseline = deepcopy(single_metric_with_baseline)
tmp_single_metric_with_baseline['metrics'] = {}
clean = False
while clean is False:
clean = True
baseline_val = data_series.mean()
for val in data_series.index:
tmp_single_metric_with_baseline['metrics'][metric] = baseline_val
if baseline_val == 0:
break
data_row = pd.Series([data_series[val]], index=[metric])
details = []
categories = set()
summary_data_row = pd.Series(index=[metric], dtype=float)
violated_num = rule_op(data_row, tmp_single_metric_with_baseline, summary_data_row, details, categories)
if violated_num:
data_series = data_series.drop(val)
clean = False
baseline = tmp_single_metric_with_baseline['metrics'][metric]
return baseline, data_series

def get_aggregate_data(self, raw_data_file, summary_rule_file):
r"""Aggregate raw data according to the summary rule file.

If the metric is aggregated by rank (:\d+), remove the rank info to generate the metric name and aggregate data
If the metric is aggregated by pattern in regex, aggregate the data and copy to all metrics which match this pattern

Args:
raw_data_file (str): the file name of the raw data file
summary_rule_file (str): the file name of the summary rule file

Returns:
DataFrame: aggregated data
"""
self.rs = ResultSummary()
rules = self.rs._preprocess(raw_data_file, summary_rule_file)
# parse rules for result summary
if not self.rs._parse_rules(rules):
return
aggregated_df = pd.DataFrame()
for rule in self.rs._sb_rules:
single_metric_rule = self.rs._sb_rules[rule]
metrics = list(single_metric_rule['metrics'].keys())
data_df_of_rule = self.rs._raw_data_df[metrics]
if self.rs._sb_rules[rule]['aggregate']:
# if aggregate is True, aggregate in ranks
if self.rs._sb_rules[rule]['aggregate'] is True:
data_df_of_rule = data_analysis.aggregate(data_df_of_rule)
# if aggregate is not empty and is a pattern in regex, aggregate according to pattern
else:
pattern = self.rs._sb_rules[rule]['aggregate']
data_df_of_rule_with_short_name = data_analysis.aggregate(data_df_of_rule, pattern)
data_df_of_rule = pd.DataFrame(columns=metrics)
# restore the columns of data_fd to full metric names
for metric in metrics:
short = ''
match = re.search(pattern, metric)
if match:
metric_in_list = list(metric)
for i in range(1, len(match.groups()) + 1):
metric_in_list[match.start(i):match.end(i)] = '*'
short = ''.join(metric_in_list)
data_df_of_rule[metric] = data_df_of_rule_with_short_name[short]
aggregated_df = pd.concat([aggregated_df, data_df_of_rule], axis=1)
return aggregated_df

def generate_baseline(self, algo, aggregated_df, diagnosis_rule_file, baseline):
"""Generate the baseline in json format.

Args:
algo (str): the algorithm to generate the baseline
aggregated_df (DataFrame): aggregated data
diagnosis_rule_file (str): the file name of the diagnosis rules which used in fix_threshold algorithm
baseline (dict): existing baseline of some metrics

Returns:
dict: baseline of metrics defined in diagnosis_rule_files for fix_threshold algorithm or defined in rule_summary_files for mean
"""
baseline = {}
# re-organize metrics by benchmark names
self._benchmark_metrics_dict = self._get_metrics_by_benchmarks(list(self._raw_data_df.columns))
if algo == 'mean':
mean_df = self._raw_data_df.mean()
for metric in self._raw_data_df.columns:
if metric in baseline:
return baseline[metric]
baseline[metric] = mean_df[metric]
elif algo == 'fix_threshold':
# read diagnosis rules
rules = file_handler.read_rules(diagnosis_rule_file)
if not self._parse_rules_and_baseline(rules, baseline):
return baseline
else:
for rule in self._sb_rules:
single_metric_rule = self._sb_rules[rule]
metrics = list(single_metric_rule['metrics'].keys())
function_name = self._sb_rules[rule]['function']
rule_op = RuleOp.get_rule_func(DiagnosisRuleType(function_name))
outputs = Parallel(n_jobs=-1)(
delayed(self.fix_threshold_outlier_detection)
(aggregated_df[metric], single_metric_rule, metric, rule_op) for metric in metrics
)
for index, out in enumerate(outputs):
baseline[metrics[index]] = out[0]
aggregated_df[metrics[index]] = out[1]
return baseline

def run(
self,
raw_data_file,
summary_rule_file,
output_dir,
algorithm='mean',
diagnosis_rule_file=None,
baseline_file=None,
digit=2
):
"""Export baseline to json file.

If diagnosis_rule_file is None, use mean of the data as baseline.
If diagnosis_rule_file is not None, use the rules in diagnosis_rule_file to execute fix_threshold algorithm.

Args:
raw_data_df (DataFrame): raw data
summary_rule_file (str): the file name of the summary rule file
output_dir (str): the directory to save the baseline file
algorithm (str): the algorithm to generate the baseline
diagnosis_rule_file (str): the file name of the diagnosis rules which used in fix_threshold algorithm
baseline_file (str): the file name of the baseline file
digit (int): the number of digits after the decimal point
"""
try:
# aggregate results from different devices
self._raw_data_df = self.get_aggregate_data(raw_data_file, summary_rule_file)
# read existing baseline
baseline = {}
if baseline_file:
baseline = file_handler.read_baseline()
# generate baseline accordint to rules in diagnosis and fix threshold outlier detection method
baseline = self.generate_baseline(algorithm, self._raw_data_df, diagnosis_rule_file, baseline)
for metric in baseline:
val = baseline[metric]
if isinstance(self._raw_data_df[metric].iloc[0], float):
baseline[metric] = f'%.{digit}g' % val if abs(val) < 1 else f'%.{digit}f' % val
elif isinstance(self._raw_data_df[metric].iloc[0], int):
baseline[metric] = int(val)
else:
try:
baseline[metric] = float(val)
except Exception as e:
logger.error('Analyzer: {} baseline is not numeric, msg: {}'.format(metric, str(e)))
baseline = json.dumps(baseline, indent=2, sort_keys=True)
baseline = re.sub(r': \"(\d+.?\d*)\"', r': \1', baseline)
with open(output_dir + '/baseline.json', mode='w') as f:
f.write(baseline)

except Exception as e:
logger.error('Analyzer: generate baseline failed, msg: {}'.format(str(e)))


if __name__ == '__main__':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you please remove the main()? It should be the feature for sb command

global args
parser = argparse.ArgumentParser()
parser.add_argument(
'--algo',
type=str,
default='fix_threshold',
required=False,
help='Algorithm to generate baseline, eg, mean/fix_threshold.'
)
parser.add_argument(
'--input_dir',
type=str,
default=None,
required=False,
help='Input directory which stores the results-summary.jsonl.'
)
parser.add_argument(
'--diagnosis_rule_file', type=str, default=None, required=False, help='The input path of diagnosis rule file.'
)
parser.add_argument(
'--summary_rule_file', type=str, default=None, required=False, help='The input path of summary rule file.'
)
args = parser.parse_args()
folder = args.input_dir
if args.algo == 'mean':
# simply use mean, need result_summary rules to define how to aggregate the metrics.
print('Generate baseine using mean of the data.')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use logger?

elif args.algo == 'fix_threshold':
# use fix threshold method, need result_summary rules to define how to aggregate the metrics and diagnosis_rules.yaml to define the rules for the metrics.
print('Generate baseine using fix threshold algorithm, the threshold is defined in rules/diagnosis_rules.yaml.')
GenerateBaseline().run(
folder + '/results-summary.jsonl', args.summary_rule_file, folder, 'fix_threshold', args.diagnosis_rule_file
)
Loading
Loading