From bd6aab25e52b8dbd7db66747b7299c013ec33edd Mon Sep 17 00:00:00 2001 From: hongtaozhang Date: Fri, 13 Dec 2024 10:39:25 -0800 Subject: [PATCH] Add func to handle waied and unsupported test cases. --- examples/benchmarks/nvbandwidth.py | 2 +- .../micro_benchmarks/nvbandwidth.py | 128 +++++++++++++----- .../micro_benchmarks/test_nvbandwidth.py | 4 +- 3 files changed, 96 insertions(+), 38 deletions(-) diff --git a/examples/benchmarks/nvbandwidth.py b/examples/benchmarks/nvbandwidth.py index e89edfbfc..afdb46ddf 100644 --- a/examples/benchmarks/nvbandwidth.py +++ b/examples/benchmarks/nvbandwidth.py @@ -16,7 +16,7 @@ platform=Platform.CUDA, parameters=( '--buffer_size 128 ' - '--test_cases 0 1 19 20 ' + '--test_cases host_to_device_memcpy_ce device_to_host_bidirectional_memcpy_ce ' '--skip_verification ' '--disable_affinity ' '--use_mean ' diff --git a/superbench/benchmarks/micro_benchmarks/nvbandwidth.py b/superbench/benchmarks/micro_benchmarks/nvbandwidth.py index 110966b86..2f6a9c3c0 100644 --- a/superbench/benchmarks/micro_benchmarks/nvbandwidth.py +++ b/superbench/benchmarks/micro_benchmarks/nvbandwidth.py @@ -4,15 +4,23 @@ """Module of the NV Bandwidth Test.""" import os +import subprocess import re from superbench.common.utils import logger -from superbench.benchmarks import BenchmarkRegistry, Platform +from superbench.benchmarks import BenchmarkRegistry, Platform, ReturnCode from superbench.benchmarks.micro_benchmarks import MicroBenchmarkWithInvoke class NvBandwidthBenchmark(MicroBenchmarkWithInvoke): """The NV Bandwidth Test benchmark class.""" + # Regular expressions for summary line and matrix header detection + re_block_start_pattern = re.compile(r'^Running\s+(.+)$') + re_matrix_header_line = re.compile(r'^(memcpy|memory latency)') + re_matrix_row_pattern = re.compile(r'^\s*\d') + re_summary_pattern = re.compile(r'SUM (\S+) (\d+\.\d+)') + re_unsupported_pattern = re.compile(r'ERROR: Testcase (\S+) not found!') + def __init__(self, name, parameters=''): """Constructor. @@ -43,8 +51,8 @@ def add_parser_arguments(self): default=[], required=False, help=( - 'Specify the test case(s) to execute, either by name or index. ' - 'To view the available test case names or indices, run the command nvbandwidth on the host. ' + 'Specify the test case(s) to execute by name only. ' + 'To view the available test case names, run the command "nvbandwidth -l" on the host. ' 'If no specific test case is specified, all test cases will be executed by default.' ), ) @@ -95,6 +103,8 @@ def _preprocess(self): if self._args.test_cases: command += ' --testcase ' + ' '.join(self._args.test_cases) + else: + self._args.test_cases = self._get_all_test_cases() if self._args.skip_verification: command += ' --skipVerification' @@ -113,78 +123,79 @@ def _preprocess(self): return True def _process_raw_line(self, line, parse_status): - """Process a single line of raw output from the nvbandwidth benchmark. - - This function updates the `parse_status` dictionary with parsed results from the given `line`. - It detects the start of a test, parses matrix headers and rows, and extracts summary results. + """Process a raw line of text and update the parse status accordingly. Args: - line (str): A single line of raw output from the benchmark. - parse_status (dict): A dictionary to maintain the current parsing state and results. It should contain: - - 'test_name' (str): The name of the current test being parsed. - - 'benchmark_type' (str): 'bw' or 'lat'. It also indicating if matrix data is being parsed. - - 'matrix_header' (list): The header of the matrix being parsed. - - 'results' (dict): A dictionary to store the parsed results. + line (str): The raw line of text to be processed. + parse_status (dict): A dictionary containing the current parsing status, + which will be updated based on the content of the line. - Return: + Returns: None """ - # Regular expressions for summary line and matrix header detection - block_start_pattern = re.compile(r'^Running\s+(.+)$') - summary_pattern = re.compile(r'SUM (\S+) (\d+\.\d+)') - matrix_header_line = re.compile(r'^(memcpy|memory latency)') - matrix_row_pattern = re.compile(r'^\s*\d') - line = line.strip() + # Detect unsupported test cases + if self.re_unsupported_pattern.match(line): + parse_status['unsupported_testcases'].add(self.re_unsupported_pattern.match(line).group(1).lower()) + return + # Detect the start of a test - if block_start_pattern.match(line): - parse_status['test_name'] = block_start_pattern.match(line).group(1).lower()[:-1] + if self.re_block_start_pattern.match(line): + parse_status['test_name'] = self.re_block_start_pattern.match(line).group(1).lower()[:-1] + parse_status['excuted_testcases'].add(parse_status['test_name']) return # Detect the start of matrix data - if parse_status['test_name'] and matrix_header_line.match(line): + if parse_status['test_name'] and self.re_matrix_header_line.match(line): parse_status['benchmark_type'] = 'bw' if 'bandwidth' in line else 'lat' + # Parse the row and column name + tmp_idx = line.find('(row)') + parse_status['metrix_row'] = line[tmp_idx - 3:tmp_idx].lower() + tmp_idx = line.find('(column)') + parse_status['metrix_col'] = line[tmp_idx - 3:tmp_idx].lower() return # Parse the matrix header if ( parse_status['test_name'] and parse_status['benchmark_type'] and not parse_status['matrix_header'] - and matrix_row_pattern.match(line) + and self.re_matrix_row_pattern.match(line) ): parse_status['matrix_header'] = line.split() return # Parse matrix rows - if parse_status['test_name'] and parse_status['benchmark_type'] and matrix_row_pattern.match(line): + if parse_status['test_name'] and parse_status['benchmark_type'] and self.re_matrix_row_pattern.match(line): row_data = line.split() row_index = row_data[0] for col_index, value in enumerate(row_data[1:], start=1): - # Skip 'N/A' values + # Skip 'N/A' values, 'N/A' indicates the test path is self to self. if value == 'N/A': continue col_header = parse_status['matrix_header'][col_index - 1] test_name = parse_status['test_name'] benchmark_type = parse_status['benchmark_type'] - metric_name = f'{test_name}_cpu{row_index}_gpu{col_header}_{benchmark_type}' + row_name = parse_status['metrix_row'] + col_name = parse_status['metrix_col'] + metric_name = f'{test_name}_{row_name}{row_index}_{col_name}{col_header}_{benchmark_type}' parse_status['results'][metric_name] = float(value) return # Parse summary results - summary_match = summary_pattern.search(line) - if summary_match: - value = summary_match.group(2) - # Skip 'N/A' values - if value != 'N/A': - test_name = parse_status['test_name'] - benchmark_type = parse_status['benchmark_type'] - parse_status['results'][f'{test_name}_sum_{benchmark_type}'] = float(value) + if self.re_summary_pattern.match(line): + value = self.re_summary_pattern.match(line).group(2) + test_name = parse_status['test_name'] + benchmark_type = parse_status['benchmark_type'] + parse_status['results'][f'{test_name}_sum_{benchmark_type}'] = float(value) # Reset parsing state for next test parse_status['test_name'] = '' parse_status['benchmark_type'] = None parse_status['matrix_header'].clear() + parse_status['metrix_row'] = '' + parse_status['metrix_col'] = '' + return def _process_raw_result(self, cmd_idx, raw_output): """Function to parse raw results and save the summarized results. @@ -203,22 +214,45 @@ def _process_raw_result(self, cmd_idx, raw_output): content = raw_output.splitlines() parsing_status = { 'results': {}, + 'excuted_testcases': set(), + 'unsupported_testcases': set(), 'benchmark_type': None, 'matrix_header': [], 'test_name': '', + 'metrix_row': '', + 'metrix_col': '', } for line in content: self._process_raw_line(line, parsing_status) + return_code = ReturnCode.SUCCESS + # Log unsupported test cases + for testcase in parsing_status['unsupported_testcases']: + logger.warning(f'Test case {testcase} is not supported.') + return_code = ReturnCode.INVALID_ARGUMENT + self._result.add_raw_data(testcase, 'Not supported', self._args.log_raw_data) + + # Check if the test case was waived + for testcase in self._args.test_cases: + if ( + testcase not in parsing_status['unsupported_testcases'] + and testcase not in parsing_status['excuted_testcases'] + ): + logger.warning(f'Test case {testcase} was waived.') + self._result.add_raw_data(testcase, 'waived', self._args.log_raw_data) + return_code = ReturnCode.INVALID_ARGUMENT + if not parsing_status['results']: self._result.add_raw_data('nvbandwidth', 'No valid results found', self._args.log_raw_data) + return_code = ReturnCode.MICROBENCHMARK_RESULT_PARSING_FAILURE return False # Store parsed results for metric, value in parsing_status['results'].items(): self._result.add_result(metric, value) + self._result.set_return_code(return_code) return True except Exception as e: logger.error( @@ -229,5 +263,29 @@ def _process_raw_result(self, cmd_idx, raw_output): self._result.add_result('abort', 1) return False + @staticmethod + def _get_all_test_cases(): + command = 'nvbandwidth -l' + test_case_pattern = re.compile(r'(\d+),\s+([\w_]+):') + + try: + # Execute the command and capture output + result = subprocess.run(command, shell=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + # Check the return code + if result.returncode != 0: + logger.error(f'{command} failed with return code {result.returncode}') + return [] + + if result.stderr: + logger.error(f'{command} failed with {result.stderr}') + return [] + + # Parse the output + return [name for _, name in test_case_pattern.findall(result.stdout)] + except Exception as e: + logger.error(f'Failed to get all test case names: {e}') + return [] + BenchmarkRegistry.register_benchmark('nvbandwidth', NvBandwidthBenchmark, platform=Platform.CUDA) diff --git a/tests/benchmarks/micro_benchmarks/test_nvbandwidth.py b/tests/benchmarks/micro_benchmarks/test_nvbandwidth.py index 630e77179..7edf69d5f 100644 --- a/tests/benchmarks/micro_benchmarks/test_nvbandwidth.py +++ b/tests/benchmarks/micro_benchmarks/test_nvbandwidth.py @@ -34,7 +34,7 @@ def test_nvbandwidth_preprocess(self): # Test preprocess with specified parameters parameters = ( '--buffer_size 256 ' - '--test_cases 0 1 2 19 20 ' + '--test_cases host_to_device_memcpy_ce device_to_host_bidirectional_memcpy_ce ' '--skip_verification ' '--disable_affinity ' '--use_mean ' @@ -47,7 +47,7 @@ def test_nvbandwidth_preprocess(self): # Check command assert (1 == len(benchmark._commands)) assert ('--bufferSize 256' in benchmark._commands[0]) - assert ('--testcase 0 1 2 19 20' in benchmark._commands[0]) + assert ('--testcase host_to_device_memcpy_ce device_to_host_bidirectional_memcpy_ce' in benchmark._commands[0]) assert ('--skipVerification' in benchmark._commands[0]) assert ('--disableAffinity' in benchmark._commands[0]) assert ('--useMean' in benchmark._commands[0])