-
Notifications
You must be signed in to change notification settings - Fork 60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Analyzer - Generate baseline given results from multiple nodes. #573
Changes from 12 commits
d4f91d2
d63e962
8b9de38
5eb9219
d3df14a
c589e8d
19d7bee
77ef4cc
6e21416
0ddeed8
b5d5cc7
1797a29
6ded588
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -160,11 +160,12 @@ def run(self): | |
'matplotlib>=3.0.0', | ||
'natsort>=7.1.1', | ||
'networkx>=2.5', | ||
'numpy>=1.19.2', | ||
'numpy>=1.20.3', | ||
'omegaconf==2.0.6', | ||
'openpyxl>=3.0.7', | ||
'pandas>=1.1.5', | ||
'pssh @ git+https://github.com/lilydjwg/[email protected]', | ||
'python-dateutil>=2.8.2' | ||
'pyyaml>=5.3', | ||
'requests>=2.27.1', | ||
'seaborn>=0.11.2', | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,248 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would you please add test for this function |
||
# Licensed under the MIT license. | ||
|
||
"""A module for baseline generation.""" | ||
|
||
import argparse | ||
from copy import deepcopy | ||
import json | ||
import re | ||
|
||
from joblib import Parallel, delayed | ||
import pandas as pd | ||
|
||
from superbench.common.utils import logger | ||
from superbench.analyzer import file_handler | ||
from superbench.analyzer import data_analysis | ||
from superbench.analyzer import DataDiagnosis | ||
from superbench.analyzer import ResultSummary | ||
from superbench.analyzer.diagnosis_rule_op import RuleOp, DiagnosisRuleType | ||
from superbench.benchmarks.context import Enum | ||
|
||
|
||
class BaselineAlgoType(Enum): | ||
"""The Enum class representing different baseline generation algorithm.""" | ||
|
||
MEAN = 'mean' | ||
FIX_THRESHOLD = 'fix_threshold' | ||
|
||
|
||
class GenerateBaseline(DataDiagnosis): | ||
"""The class to generate baseline for raw data.""" | ||
|
||
def fix_threshold_outlier_detection(self, data_series, single_metric_with_baseline, metric, rule_op): | ||
"""Fix threshold outlier detection algorithm. | ||
|
||
Step 0: Put all data in the collection | ||
Step 1: Regenerate the collection | ||
Calculate the average number in the collection as the baseline | ||
Remove all data which cannot pass the fix threshold based on the new baseline | ||
Step 2: If no data has been removed from Step 1, go to Step 3; otherwise, go to Step 1 | ||
Step 3: Use the baseline and fix threshold for Outlier Detection | ||
|
||
Args: | ||
data_series (pd.Series): data of the metric | ||
single_metric_with_baseline (dict): baseline of the single metric in 'metrics' in 2-layer dict format | ||
metric (str): the name of the metric to execute the algorithm | ||
rule_op (function): diagnosis rule op function | ||
|
||
Returns: | ||
tuple: the baseline of the metric, normal data of the metric | ||
""" | ||
if single_metric_with_baseline['metrics'][metric] != None and single_metric_with_baseline['metrics'][metric | ||
] != -1: | ||
return single_metric_with_baseline['metrics'][metric] | ||
tmp_single_metric_with_baseline = deepcopy(single_metric_with_baseline) | ||
tmp_single_metric_with_baseline['metrics'] = {} | ||
clean = False | ||
while clean is False: | ||
clean = True | ||
baseline_val = data_series.mean() | ||
for val in data_series.index: | ||
tmp_single_metric_with_baseline['metrics'][metric] = baseline_val | ||
if baseline_val == 0: | ||
break | ||
data_row = pd.Series([data_series[val]], index=[metric]) | ||
details = [] | ||
categories = set() | ||
summary_data_row = pd.Series(index=[metric], dtype=float) | ||
violated_num = rule_op(data_row, tmp_single_metric_with_baseline, summary_data_row, details, categories) | ||
if violated_num: | ||
data_series = data_series.drop(val) | ||
clean = False | ||
baseline = tmp_single_metric_with_baseline['metrics'][metric] | ||
return baseline, data_series | ||
|
||
def get_aggregate_data(self, raw_data_file, summary_rule_file): | ||
r"""Aggregate raw data according to the summary rule file. | ||
|
||
If the metric is aggregated by rank (:\d+), remove the rank info to generate the metric name and aggregate data | ||
If the metric is aggregated by pattern in regex, aggregate the data and copy to all metrics which match this pattern | ||
|
||
Args: | ||
raw_data_file (str): the file name of the raw data file | ||
summary_rule_file (str): the file name of the summary rule file | ||
|
||
Returns: | ||
DataFrame: aggregated data | ||
""" | ||
self.rs = ResultSummary() | ||
rules = self.rs._preprocess(raw_data_file, summary_rule_file) | ||
# parse rules for result summary | ||
if not self.rs._parse_rules(rules): | ||
return | ||
aggregated_df = pd.DataFrame() | ||
for rule in self.rs._sb_rules: | ||
single_metric_rule = self.rs._sb_rules[rule] | ||
metrics = list(single_metric_rule['metrics'].keys()) | ||
data_df_of_rule = self.rs._raw_data_df[metrics] | ||
if self.rs._sb_rules[rule]['aggregate']: | ||
# if aggregate is True, aggregate in ranks | ||
if self.rs._sb_rules[rule]['aggregate'] is True: | ||
data_df_of_rule = data_analysis.aggregate(data_df_of_rule) | ||
# if aggregate is not empty and is a pattern in regex, aggregate according to pattern | ||
else: | ||
pattern = self.rs._sb_rules[rule]['aggregate'] | ||
data_df_of_rule_with_short_name = data_analysis.aggregate(data_df_of_rule, pattern) | ||
data_df_of_rule = pd.DataFrame(columns=metrics) | ||
# restore the columns of data_fd to full metric names | ||
for metric in metrics: | ||
short = '' | ||
match = re.search(pattern, metric) | ||
if match: | ||
metric_in_list = list(metric) | ||
for i in range(1, len(match.groups()) + 1): | ||
metric_in_list[match.start(i):match.end(i)] = '*' | ||
short = ''.join(metric_in_list) | ||
data_df_of_rule[metric] = data_df_of_rule_with_short_name[short] | ||
aggregated_df = pd.concat([aggregated_df, data_df_of_rule], axis=1) | ||
return aggregated_df | ||
|
||
def generate_baseline(self, algo, aggregated_df, diagnosis_rule_file, baseline): | ||
"""Generate the baseline in json format. | ||
|
||
Args: | ||
algo (str): the algorithm to generate the baseline | ||
aggregated_df (DataFrame): aggregated data | ||
diagnosis_rule_file (str): the file name of the diagnosis rules which used in fix_threshold algorithm | ||
baseline (dict): existing baseline of some metrics | ||
|
||
Returns: | ||
dict: baseline of metrics defined in diagnosis_rule_files for fix_threshold algorithm or defined in rule_summary_files for mean | ||
""" | ||
baseline = {} | ||
# re-organize metrics by benchmark names | ||
self._benchmark_metrics_dict = self._get_metrics_by_benchmarks(list(self._raw_data_df.columns)) | ||
if algo == 'mean': | ||
mean_df = self._raw_data_df.mean() | ||
for metric in self._raw_data_df.columns: | ||
if metric in baseline: | ||
return baseline[metric] | ||
baseline[metric] = mean_df[metric] | ||
elif algo == 'fix_threshold': | ||
# read diagnosis rules | ||
rules = file_handler.read_rules(diagnosis_rule_file) | ||
if not self._parse_rules_and_baseline(rules, baseline): | ||
return baseline | ||
else: | ||
for rule in self._sb_rules: | ||
single_metric_rule = self._sb_rules[rule] | ||
metrics = list(single_metric_rule['metrics'].keys()) | ||
function_name = self._sb_rules[rule]['function'] | ||
rule_op = RuleOp.get_rule_func(DiagnosisRuleType(function_name)) | ||
outputs = Parallel(n_jobs=-1)( | ||
delayed(self.fix_threshold_outlier_detection) | ||
(aggregated_df[metric], single_metric_rule, metric, rule_op) for metric in metrics | ||
) | ||
for index, out in enumerate(outputs): | ||
baseline[metrics[index]] = out[0] | ||
aggregated_df[metrics[index]] = out[1] | ||
return baseline | ||
|
||
def run( | ||
self, | ||
raw_data_file, | ||
summary_rule_file, | ||
output_dir, | ||
algorithm='mean', | ||
diagnosis_rule_file=None, | ||
baseline_file=None, | ||
digit=2 | ||
): | ||
"""Export baseline to json file. | ||
|
||
If diagnosis_rule_file is None, use mean of the data as baseline. | ||
If diagnosis_rule_file is not None, use the rules in diagnosis_rule_file to execute fix_threshold algorithm. | ||
|
||
Args: | ||
raw_data_df (DataFrame): raw data | ||
summary_rule_file (str): the file name of the summary rule file | ||
output_dir (str): the directory to save the baseline file | ||
algorithm (str): the algorithm to generate the baseline | ||
diagnosis_rule_file (str): the file name of the diagnosis rules which used in fix_threshold algorithm | ||
baseline_file (str): the file name of the baseline file | ||
digit (int): the number of digits after the decimal point | ||
""" | ||
try: | ||
# aggregate results from different devices | ||
self._raw_data_df = self.get_aggregate_data(raw_data_file, summary_rule_file) | ||
# read existing baseline | ||
baseline = {} | ||
if baseline_file: | ||
baseline = file_handler.read_baseline() | ||
# generate baseline accordint to rules in diagnosis and fix threshold outlier detection method | ||
baseline = self.generate_baseline(algorithm, self._raw_data_df, diagnosis_rule_file, baseline) | ||
for metric in baseline: | ||
val = baseline[metric] | ||
if isinstance(self._raw_data_df[metric].iloc[0], float): | ||
baseline[metric] = f'%.{digit}g' % val if abs(val) < 1 else f'%.{digit}f' % val | ||
elif isinstance(self._raw_data_df[metric].iloc[0], int): | ||
baseline[metric] = int(val) | ||
else: | ||
try: | ||
baseline[metric] = float(val) | ||
except Exception as e: | ||
logger.error('Analyzer: {} baseline is not numeric, msg: {}'.format(metric, str(e))) | ||
baseline = json.dumps(baseline, indent=2, sort_keys=True) | ||
baseline = re.sub(r': \"(\d+.?\d*)\"', r': \1', baseline) | ||
with open(output_dir + '/baseline.json', mode='w') as f: | ||
f.write(baseline) | ||
|
||
except Exception as e: | ||
logger.error('Analyzer: generate baseline failed, msg: {}'.format(str(e))) | ||
|
||
|
||
if __name__ == '__main__': | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would you please remove the main()? It should be the feature for sb command |
||
global args | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument( | ||
'--algo', | ||
type=str, | ||
default='fix_threshold', | ||
required=False, | ||
help='Algorithm to generate baseline, eg, mean/fix_threshold.' | ||
) | ||
parser.add_argument( | ||
'--input_dir', | ||
type=str, | ||
default=None, | ||
required=False, | ||
help='Input directory which stores the results-summary.jsonl.' | ||
) | ||
parser.add_argument( | ||
'--diagnosis_rule_file', type=str, default=None, required=False, help='The input path of diagnosis rule file.' | ||
) | ||
parser.add_argument( | ||
'--summary_rule_file', type=str, default=None, required=False, help='The input path of summary rule file.' | ||
) | ||
args = parser.parse_args() | ||
folder = args.input_dir | ||
if args.algo == 'mean': | ||
# simply use mean, need result_summary rules to define how to aggregate the metrics. | ||
print('Generate baseine using mean of the data.') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to use logger? |
||
elif args.algo == 'fix_threshold': | ||
# use fix threshold method, need result_summary rules to define how to aggregate the metrics and diagnosis_rules.yaml to define the rules for the metrics. | ||
print('Generate baseine using fix threshold algorithm, the threshold is defined in rules/diagnosis_rules.yaml.') | ||
GenerateBaseline().run( | ||
folder + '/results-summary.jsonl', args.summary_rule_file, folder, 'fix_threshold', args.diagnosis_rule_file | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you please add a document for this function?