diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1c885bb --- /dev/null +++ b/.gitignore @@ -0,0 +1,29 @@ +# common file patterns +.DS_STORE +.idea +*.iml +*.pyc +__pycache__ +*.jar +*.class +.project +*.prefs +_build +venv +*.log +*.tar.gz +*.db +*.db-journal +*.mdb +.env +.vscode/* +Cargo.lock +**/*.egg-info + +# excluded paths +/data/ +/logs/ +/jobs/ +/metric/ +/python/dist/ +/python/build/ diff --git a/README.md b/README.md index 8358063..68c895b 100644 --- a/README.md +++ b/README.md @@ -1 +1,5 @@ -# FATE-Test \ No newline at end of file +# FATE-Test + +A collection of useful tools to running FATE tasks, including tests and benchmark comparisons. +For a tutorial on using FATE-Test command line tools and compiling test suites, please +refer to this [document](./doc/fate_test.md). \ No newline at end of file diff --git a/README_zh.md b/README_zh.md new file mode 100644 index 0000000..ec07a1e --- /dev/null +++ b/README_zh.md @@ -0,0 +1,4 @@ +# FATE-Test + +FATE-Test用于批量执行FATE任务,包括测试用例和性能及效果测试。 +相关命令行工具说明及使用方法请参考[文档](./doc/fate_test.md)。 diff --git a/RELEASE.md b/RELEASE.md new file mode 100644 index 0000000..798dfea --- /dev/null +++ b/RELEASE.md @@ -0,0 +1,4 @@ +## Release 2.0.0-beta +### Major Features and Improvements +> Fate-Test: FATE Automated Testing Tool +* Migrated automated testing for functionality, performance, and correctness diff --git a/doc/fate_test.md b/doc/fate_test.md new file mode 100644 index 0000000..b7e35b7 --- /dev/null +++ b/doc/fate_test.md @@ -0,0 +1,91 @@ +# FATE Test Tutorial + +A collection of useful tools to running FATE tests and PipeLine tasks. + +## quick start + +1. install + + ```bash + pip install -e python/fate_test + ``` +2. edit default fate\_test\_config.yaml + + ```bash + # edit priority config file with system default editor + # filling some field according to comments + fate_test config edit + ``` + +3. configure FATE-Flow Commandline server setting + + ```bash + # configure FATE-Flow Commandline server setting + flow init --port 9380 --ip 127.0.0.1 + ``` + +4. run some fate\_test suite + + ```bash + fate_test suite -i + ``` + +5. run some fate\_test benchmark quality + + ```bash + fate_test benchmark-quality -i + ``` + +6. run some fate\_test benchmark performance + + ```bash + fate_test benchmark-quality -i + ``` + +7useful logs or exception will be saved to logs dir with namespace +shown in last step + +## command types + +- [suite](../api/fate_test.md#testsuite): used for running [testsuites](../api/fate_test.md#testsuite-configuration), + collection of FATE jobs + + ```bash + fate_test suite -i + ``` + +- [data](../api/fate_test.md#data): used for upload, delete, and generate dataset + + - [upload/delete data](../api/fate_test.md#data-command-options) command: + + ```bash + fate_test data [upload|delete] -i + ``` + - [upload example data of min_test/all_examples](../api/fate_test.md#data-command-options) command: + + ```bash + fate_test data upload -t min_test + fate_test data upload -t all_examples + ``` + + - [generate data](../api/fate_test.md#generate-command-options) command: + + ```bash + fate_test data generate -i + ``` + +- [benchmark-quality](../api/fate_test.md#benchmark-quality): used for comparing modeling quality between FATE + and other machine learning systems, as specified + in [benchmark job configuration](../api/fate_test.md#benchmark-job-configuration) + + ```bash + fate_test bq -i + ``` + +- [benchmark-performance](../api/fate_test.md#benchmark-performance): used for checking FATE algorithm performance; user + should first generate and upload data before running performance testsuite + + ```bash + fate_test data generate -i -ng 10000 -fg 10 -fh 10 -m 1.0 --upload-data + fate_test performance -i --skip-data + ``` \ No newline at end of file diff --git a/python/fate_test/__init__.py b/python/fate_test/__init__.py new file mode 100644 index 0000000..878d3a9 --- /dev/null +++ b/python/fate_test/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/fate_test/_ascii.py b/python/fate_test/_ascii.py new file mode 100644 index 0000000..ac3ba12 --- /dev/null +++ b/python/fate_test/_ascii.py @@ -0,0 +1,48 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +HEAD = """\ + +████████╗███████╗███████╗████████╗███████╗██╗ ██╗██╗████████╗███████╗ +╚══██╔══╝██╔════╝██╔════╝╚══██╔══╝██╔════╝██║ ██║██║╚══██╔══╝██╔════╝ + ██║ █████╗ ███████╗ ██║ ███████╗██║ ██║██║ ██║ █████╗ + ██║ ██╔══╝ ╚════██║ ██║ ╚════██║██║ ██║██║ ██║ ██╔══╝ + ██║ ███████╗███████║ ██║ ███████║╚██████╔╝██║ ██║ ███████╗ + ╚═╝ ╚══════╝╚══════╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝ ╚══════╝ + +""" + +BENCHMARK = """\ + +██████╗ ███████╗███╗ ██╗ ██████╗██╗ ██╗███╗ ███╗ █████╗ ██████╗ ██╗ ██╗ +██╔══██╗██╔════╝████╗ ██║██╔════╝██║ ██║████╗ ████║██╔══██╗██╔══██╗██║ ██╔╝ +██████╔╝█████╗ ██╔██╗ ██║██║ ███████║██╔████╔██║███████║██████╔╝█████╔╝ +██╔══██╗██╔══╝ ██║╚██╗██║██║ ██╔══██║██║╚██╔╝██║██╔══██║██╔══██╗██╔═██╗ +██████╔╝███████╗██║ ╚████║╚██████╗██║ ██║██║ ╚═╝ ██║██║ ██║██║ ██║██║ ██╗ +╚═════╝ ╚══════╝╚═╝ ╚═══╝ ╚═════╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═╝ +""" + +TAIL = """\ + + ██╗ ██╗ █████╗ ██╗ ██╗███████╗ ███████╗██╗ ██╗███╗ ██╗ + ██║ ██║██╔══██╗██║ ██║██╔════╝ ██╔════╝██║ ██║████╗ ██║ + ███████║███████║██║ ██║█████╗ █████╗ ██║ ██║██╔██╗ ██║ + ██╔══██║██╔══██║╚██╗ ██╔╝██╔══╝ ██╔══╝ ██║ ██║██║╚██╗██║ + ██║ ██║██║ ██║ ╚████╔╝ ███████╗ ██║ ╚██████╔╝██║ ╚████║ + ╚═╝ ╚═╝╚═╝ ╚═╝ ╚═══╝ ╚══════╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝ + +""" diff --git a/python/fate_test/_client.py b/python/fate_test/_client.py new file mode 100644 index 0000000..d802b8a --- /dev/null +++ b/python/fate_test/_client.py @@ -0,0 +1,44 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from fate_test._flow_client import FLOWClient +from fate_test._parser import Config + + +class Clients(object): + def __init__(self, config: Config): + self._flow_clients = {} + # self._tunnel_id_to_flow_clients = {} + self._role_str_to_service_id = {} + for party, service_id in config.party_to_service_id.items(): + for role_str in config.parties.party_to_role_string(party): + self._role_str_to_service_id[role_str] = service_id + + for service_id, service in config.service_id_to_service.items(): + if isinstance(service, Config.service): + self._flow_clients[service_id] = FLOWClient( + service.address, config.data_base_dir, config.cache_directory) + + def __getitem__(self, role_str: str) -> 'FLOWClient': + if role_str not in self._role_str_to_service_id: + raise RuntimeError(f"no flow client found binding to {role_str}") + return self._flow_clients[self._role_str_to_service_id[role_str]] + + def contains(self, role_str): + return role_str in self._role_str_to_service_id + + def all_roles(self): + return sorted(self._role_str_to_service_id.keys()) diff --git a/python/fate_test/_config.py b/python/fate_test/_config.py new file mode 100644 index 0000000..3553adf --- /dev/null +++ b/python/fate_test/_config.py @@ -0,0 +1,252 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +import os +import typing +from collections import namedtuple +from pathlib import Path + +from ruamel import yaml + +template = """\ +# base dir for data upload conf eg, data_base_dir={FATE} +# also used for accessing local files when running standalone mode +# examples/data/breast_hetero_guest.csv -> $data_base_dir/examples/data/breast_hetero_guest.csv +data_base_dir: path(FATE) + +# directory dedicated to fate_test job file storage, default cache location={FATE}/examples/cache/ +cache_directory: examples/cache/ +# directory stores performance benchmark suites, default location={FATE}/examples/benchmark_performance +performance_template_directory: examples/benchmark_performance/ +# directory stores flow test config, default location={FATE}/examples/flow_test_template/hetero_lr/flow_test_config.yaml +# st_config_directory: examples/flow_test_template/hetero_lr/flow_test_config.yaml + +# directory stores testsuite file with min_test data sets to upload, +# default location={FATE}/examples/data/upload_config/min_test_data_testsuite.json +min_test_data_config: examples/data/upload_config/min_test_data_testsuite.json +# directory stores testsuite file with all example data sets to upload, +# default location={FATE}/examples/data/upload_config/all_examples_data_testsuite.json +all_examples_data_config: examples/data/upload_config/all_examples_data_testsuite.json + +# directory where FATE code locates, default installation location={FATE}/fate +# python/ml -> $fate_base/python/ml +fate_base: path(FATE)/fate + +# whether to delete data in suites after all jobs done +clean_data: true + +# participating parties' id and correponding flow service ip & port information +parties: + guest: [9999] + host: [10000, 9999] + arbiter: [10000] +services: + - flow_services: + - {address: 127.0.0.1:9380, parties: [9999, 10000]} + serving_setting: + address: 127.0.0.1:8059 + + ssh_tunnel: # optional + enable: false + ssh_address: : + ssh_username: + ssh_password: # optional + ssh_priv_key: "~/.ssh/id_rsa" + + +# what is ssh_tunnel? +# to open the ssh tunnel(s) if the remote service +# cannot be accessed directly from the location where the test suite is run! +# +# +---------------------+ +# | ssh address | +# | ssh username | +# | ssh password/ | +# +--------+ | ssh priv_key | +----------------+ +# |local ip+----------ssh tuunel-------------->+remote local ip | +# +--------+ | | +----------------+ +# | | +# request local ip:port +----- as if --------->request remote's local ip:port from remote side +# | | +# | | +# +---------------------+ +# + +""" + +data_base_dir = Path(__file__).resolve().parents[3] +if (data_base_dir / 'examples').is_dir(): + template = template.replace('path(FATE)', str(data_base_dir)) + +_default_config = Path(__file__).resolve().parent / 'fate_test_config.yaml' + +data_switch = None +use_local_data = 1 +data_alter = dict() +deps_alter = dict() +jobs_num = 0 +jobs_progress = 0 +non_success_jobs = [] + + +def create_config(path: Path, override=False): + if path.exists() and not override: + raise FileExistsError(f"{path} exists") + + with path.open("w") as f: + f.write(template) + + +def default_config(): + if not _default_config.exists(): + create_config(_default_config) + return _default_config + + +class Parties(object): + def __init__(self, **kwargs): + """ + mostly, accept guest, host and arbiter + """ + self._role_to_parties = kwargs + + self._party_to_role_string = {} + for role in kwargs: + parties = kwargs[role] + setattr(self, role, parties) + for i, party in enumerate(parties): + if party not in self._party_to_role_string: + self._party_to_role_string[party] = set() + self._party_to_role_string[party].add(f"{role.lower()}_{i}") + + @staticmethod + def from_dict(d: typing.MutableMapping[str, typing.List[int]]): + return Parties(**d) + + def party_to_role_string(self, party): + return self._party_to_role_string[party] + + def extract_role(self, counts: typing.MutableMapping[str, int]): + roles = {} + for role, num in counts.items(): + if role not in self._role_to_parties and num > 0: + raise ValueError(f"{role} not found in config") + else: + if len(self._role_to_parties[role]) < num: + raise ValueError(f"require {num} {role} parties, only {len(self._role_to_parties[role])} in config") + roles[role] = self._role_to_parties[role][:num] + return roles + + def extract_initiator_role(self, role): + initiator_role = role.strip() + if len(self._role_to_parties[initiator_role]) < 1: + raise ValueError(f"role {initiator_role} has empty party list") + party_id = self._role_to_parties[initiator_role][0] + return dict(role=initiator_role, party_id=party_id) + + +class Config(object): + service = namedtuple("service", ["address"]) + tunnel_service = namedtuple("tunnel_service", ["tunnel_id", "index"]) + tunnel = namedtuple("tunnel", ["ssh_address", "ssh_username", "ssh_password", "ssh_priv_key", "services_address"]) + + def __init__(self, config): + self.data_base_dir = config["data_base_dir"] + self.cache_directory = os.path.join(config["data_base_dir"], config["cache_directory"]) + self.perf_template_dir = os.path.join(config["data_base_dir"], config["performance_template_directory"]) + # self.flow_test_config_dir = os.path.join(config["data_base_dir"], config["flow_test_config_directory"]) + self.min_test_data_config = os.path.join(config["data_base_dir"], config["min_test_data_config"]) + self.all_examples_data_config = os.path.join(config["data_base_dir"], config["all_examples_data_config"]) + self.fate_base = config["fate_base"] + self.clean_data = config.get("clean_data", True) + self.parties = Parties.from_dict(config["parties"]) + self.role = config["parties"] + self.serving_setting = config["services"][0] + self.party_to_service_id = {} + self.service_id_to_service = {} + self.tunnel_id_to_tunnel = {} + self.extend_sid = None + self.auto_increasing_sid = None + self.task_cores = None + self.timeout = None + # self.work_mode = config.get("work_mode", 0) + + service_id = 0 + os.makedirs(os.path.dirname(self.cache_directory), exist_ok=True) + for service_config in config["services"]: + flow_services = service_config["flow_services"] + for flow_service in flow_services: + service_id += 1 + address = flow_service["address"] + self.service_id_to_service[service_id] = self.service(address) + for party in flow_service["parties"]: + self.party_to_service_id[party] = service_id + + def update_conf(self, **kwargs): + for k, v in kwargs.items(): + setattr(self, k, v) + + @staticmethod + def load(path: typing.Union[str, Path], **kwargs): + if isinstance(path, str): + path = Path(path) + config = {} + if path is not None: + with path.open("r") as f: + config.update(yaml.safe_load(f)) + + if config["data_base_dir"] == "path(FATE)": + raise ValueError("Invalid 'data_base_dir'.") + config["data_base_dir"] = path.resolve().joinpath(config["data_base_dir"]).resolve() + + config.update(kwargs) + return Config(config) + + @staticmethod + def load_from_file(path: typing.Union[str, Path]): + """ + Loads conf content from json or yaml file. Used to read in parameter configuration + Parameters + ---------- + path: str, path to conf file, should be absolute path + + Returns + ------- + dict, parameter configuration in dictionary format + + """ + if isinstance(path, str): + path = Path(path) + config = {} + if path is not None: + file_type = path.suffix + with path.open("r") as f: + if file_type == ".yaml": + config.update(yaml.safe_load(f)) + elif file_type == ".json": + config.update(json.load(f)) + else: + raise ValueError(f"Cannot load conf from file type {file_type}") + return config + + +def parse_config(config): + try: + config_inst = Config.load(config) + except Exception as e: + raise RuntimeError(f"error parse config from {config}") from e + return config_inst diff --git a/python/fate_test/_flow_client.py b/python/fate_test/_flow_client.py new file mode 100644 index 0000000..2ccdf04 --- /dev/null +++ b/python/fate_test/_flow_client.py @@ -0,0 +1,322 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import json +import os +import time +import typing +from datetime import timedelta +from pathlib import Path + +from fate_test import _config +from fate_test._parser import Data + +from fate_client.flow_sdk import FlowClient + + +class FLOWClient(object): + + def __init__(self, + address: typing.Optional[str], + data_base_dir: typing.Optional[Path], + cache_directory: typing.Optional[Path]): + self.address = address + self.version = "v2" + self._client = FlowClient(self.address.split(':')[0], self.address.split(':')[1], self.version) + self._data_base_dir = data_base_dir + self._cache_directory = cache_directory + self.data_size = 0 + + def set_address(self, address): + self.address = address + + def transform_local_file_to_dataframe(self, data: Data, callback=None, output_path=None): + data_warehouse = self.upload_data(data, callback, output_path) + status = self.transform_to_dataframe(data.namespace, data.table_name, data_warehouse, callback) + return status + + def upload_data(self, data: Data, callback=None, output_path=None): + response, file_path = self._upload_data(data, output_path=output_path) + try: + if callback is not None: + callback(response) + code = response["code"] + if code != 0: + raise ValueError(f"Return code {code}!=0") + + namespace = response["data"]["namespace"] + name = response["data"]["name"] + job_id = response["job_id"] + except BaseException: + raise ValueError(f"Upload data fails, response={response}") + # self.monitor_status(job_id, role=self.role, party_id=self.party_id) + self._awaiting(job_id, "local", 0) + + return dict(namespace=namespace, name=name) + + def transform_to_dataframe(self, namespace, table_name, data_warehouse, callback=None): + response = self._client.data.dataframe_transformer(namespace=namespace, + name=table_name, + data_warehouse=data_warehouse) + + try: + if callback is not None: + callback(response) + status = self._awaiting(response["job_id"], "local", 0) + status = str(status).lower() + else: + status = response["retmsg"] + + except Exception as e: + raise RuntimeError(f"upload data failed") from e + job_id = response["job_id"] + self._awaiting(job_id, "local", 0) + return status + + def delete_data(self, data: Data): + try: + table_name = data.config['table_name'] if data.config.get( + 'table_name', None) is not None else data.config.get('name') + self._client.table.delete(table_name=table_name, namespace=data.config['namespace']) + except Exception as e: + raise RuntimeError(f"delete data failed") from e + + def output_data_table(self, job_id, role, party_id, task_name, output_data_name): + data_info = self._output_data_table(job_id=job_id, role=role, party_id=party_id, task_name=task_name) + output_data_info = data_info.get(output_data_name)[0] + if output_data_info is None: + raise ValueError(f"output data name {output_data_name} not found") + return output_data_info + + def table_query(self, table_name, namespace): + result = self._table_query(name=table_name, namespace=namespace) + return result + + def add_notes(self, job_id, role, party_id, notes): + self._client.job.add_notes(job_id, role=role, party_id=party_id, notes=notes) + + """def add_notes(self, job_id, role, party_id, notes): + self._add_notes(job_id=job_id, role=role, party_id=party_id, notes=notes)""" + + def _awaiting(self, job_id, role, party_id, callback=None): + while True: + response = self._query_job(job_id, role=role, party_id=party_id) + if response.status.is_done(): + return response.status + if callback is not None: + callback(response) + time.sleep(1) + + def _upload_data(self, data, output_path=None, verbose=0, destroy=1): + conf = data.config + # if conf.get("engine", {}) != "PATH": + if output_path is not None: + conf['file'] = os.path.join(os.path.abspath(output_path), os.path.basename(conf.get('file'))) + else: + if _config.data_switch is not None: + conf['file'] = os.path.join(str(self._cache_directory), os.path.basename(conf.get('file'))) + else: + conf['file'] = os.path.join(str(self._data_base_dir), conf.get('file')) + path = Path(conf.get('file')) + if not path.exists(): + raise Exception('The file is obtained from the fate flow client machine, but it does not exist, ' + f'please check the path: {path}') + response = self._client.data.upload(file=str(path), + head=data.head, + meta=data.meta, + extend_sid=data.extend_sid, + partitions=data.partitions) + return response, conf["file"] + + def _output_data_table(self, job_id, role, party_id, task_name): + response = self._client.output.data_table(job_id, role=role, party_id=party_id, task_name=task_name) + if response.get("code") is not None: + raise ValueError(f"Query output data table failed, response={response}") + + return response + + def _table_query(self, name, namespace): + response = self._client.table.query(namespace=namespace, name=name) + try: + code = response["code"] + if code != 0: + raise ValueError(f"Return code {code}!=0") + return json.dumps(response["data"], indent=4) + except BaseException: + raise ValueError(f"Query table fails, response={response}") + + def _delete_data(self, table_name, namespace): + response = self._client.table.delete(namespace=namespace, table_name=table_name) + return response + + def query_job(self, job_id, role, party_id): + response = self._client.task.query(job_id, role=role, party_id=party_id) + return response + + def _query_job(self, job_id, role, party_id): + response = self._client.job.query(job_id, role, party_id) + return QueryJobResponse(response) + + def get_version(self): + response = self._client.provider.query(name="fate") + try: + retcode = response['code'] + retmsg = response['message'] + if retcode != 0 or retmsg != 'success': + raise RuntimeError(f"get version error: {response}") + fate_version = response["data"][0]["provider_name"] + except Exception as e: + raise RuntimeError(f"get version error: {response}") from e + return fate_version + + """def _add_notes(self, job_id, role, party_id, notes): + data = dict(job_id=job_id, role=role, party_id=party_id, notes=notes) + response = AddNotesResponse(self._post(url='job/update', json=data)) + return response""" + + def _table_bind(self, data): + response = self._post(url='table/bind', json=data) + try: + retcode = response['retcode'] + retmsg = response['retmsg'] + if retcode != 0 or retmsg != 'success': + raise RuntimeError(f"table bind error: {response}") + except Exception as e: + raise RuntimeError(f"table bind error: {response}") from e + return response + + +class Status(object): + def __init__(self, status: str): + self.status = status + + def is_done(self): + return self.status.lower() in ['complete', 'success', 'canceled', 'failed', "timeout"] + + def is_success(self): + return self.status.lower() in ['complete', 'success'] + + def __str__(self): + return self.status + + def __repr__(self): + return self.__str__() + + +class QueryJobResponse(object): + def __init__(self, response: dict): + try: + status = Status(response.get('data')[0]["status"]) + progress = response.get('data')[0]['progress'] + except Exception as e: + raise RuntimeError(f"query job error, response: {json.dumps(response, indent=4)}") from e + self.status = status + self.progress = progress + + +class UploadDataResponse(object): + def __init__(self, response: dict): + try: + self.job_id = response["jobId"] + except Exception as e: + raise RuntimeError(f"upload error, response: {response}") from e + self.status: typing.Optional[Status] = None + + +class AddNotesResponse(object): + def __init__(self, response: dict): + try: + retcode = response['retcode'] + retmsg = response['retmsg'] + if retcode != 0 or retmsg != 'success': + raise RuntimeError(f"add notes error: {response}") + except Exception as e: + raise RuntimeError(f"add notes error: {response}") from e + + +"""class SubmitJobResponse(object): + def __init__(self, response: dict): + try: + self.job_id = response["jobId"] + self.model_info = response["data"]["model_info"] + except Exception as e: + raise RuntimeError(f"submit job error, response: {response}") from e + self.status: typing.Optional[Status] = None +""" + + +class DataProgress(object): + def __init__(self, role_str): + self.role_str = role_str + self.start = time.time() + self.show_str = f"[{self.elapse()}] {self.role_str}" + self.job_id = "" + + def elapse(self): + return f"{timedelta(seconds=int(time.time() - self.start))}" + + def submitted(self, job_id): + self.job_id = job_id + self.show_str = f"[{self.elapse()}]{self.job_id} {self.role_str}" + + def update(self): + self.show_str = f"[{self.elapse()}]{self.job_id} {self.role_str}" + + def show(self): + return self.show_str + + +class JobProgress(object): + def __init__(self, name): + self.name = name + self.start = time.time() + self.show_str = f"[{self.elapse()}] {self.name}" + self.job_id = "" + self.progress_tracking = "" + + def elapse(self): + return f"{timedelta(seconds=int(time.time() - self.start))}" + + def set_progress_tracking(self, progress_tracking): + self.progress_tracking = progress_tracking + " " + + def submitted(self, job_id): + self.job_id = job_id + self.show_str = f"{self.progress_tracking}[{self.elapse()}]{self.job_id} submitted {self.name}" + + def running(self, status, progress): + if progress is None: + progress = 0 + self.show_str = f"{self.progress_tracking}[{self.elapse()}]{self.job_id} {status} {progress:3}% {self.name}" + + def exception(self, exception_id): + self.show_str = f"{self.progress_tracking}[{self.elapse()}]{self.name} exception({exception_id}): {self.job_id}" + + def final(self, status): + self.show_str = f"{self.progress_tracking}[{self.elapse()}]{self.job_id} {status} {self.name}" + + def show(self): + return self.show_str + + +class JobStatus(object): + WAITING = 'waiting' + READY = 'ready' + RUNNING = "running" + CANCELED = "canceled" + TIMEOUT = "timeout" + FAILED = "failed" + PASS = "pass" + SUCCESS = "success" diff --git a/python/fate_test/_io.py b/python/fate_test/_io.py new file mode 100644 index 0000000..edfaeee --- /dev/null +++ b/python/fate_test/_io.py @@ -0,0 +1,70 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import click +import loguru + +from fate_test._ascii import HEAD, TAIL, BENCHMARK + + +# noinspection PyPep8Naming +class echo(object): + _file = None + + @classmethod + def set_file(cls, file): + cls._file = file + + @classmethod + def echo(cls, message, **kwargs): + click.secho(message, **kwargs) + click.secho(message, file=cls._file, **kwargs) + + @classmethod + def file(cls, message, **kwargs): + click.secho(message, file=cls._file, **kwargs) + + @classmethod + def stdout(cls, message, **kwargs): + click.secho(message, **kwargs) + + @classmethod + def stdout_newline(cls): + click.secho("") + + @classmethod + def welcome(cls, banner_type="testsuite"): + if banner_type == "testsuite": + cls.echo(HEAD) + elif banner_type == "benchmark": + cls.echo(BENCHMARK) + + @classmethod + def farewell(cls): + cls.echo(TAIL) + + @classmethod + def flush(cls): + import sys + sys.stdout.flush() + + +def set_logger(name): + loguru.logger.remove() + loguru.logger.add(name, level='ERROR', delay=True) + return loguru.logger + + +LOGGER = loguru.logger diff --git a/python/fate_test/_parser.py b/python/fate_test/_parser.py new file mode 100644 index 0000000..ee183a1 --- /dev/null +++ b/python/fate_test/_parser.py @@ -0,0 +1,399 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import typing +from pathlib import Path + +import prettytable +from fate_test import _config +from fate_test._config import Config +from fate_test._io import echo +from fate_test.utils import TxtStyle +# import json +from ruamel import yaml + + +# noinspection PyPep8Naming +class chain_hook(object): + def __init__(self): + self._hooks = [] + + def add_hook(self, hook): + self._hooks.append(hook) + return self + + def add_extend_namespace_hook(self, namespace): + self.add_hook(_namespace_hook(namespace)) + return self + + def add_replace_hook(self, mapping): + self.add_hook(_replace_hook(mapping)) + + def hook(self, d): + return self._chain_hooks(self._hooks, d) + + @staticmethod + def _chain_hooks(hook_funcs, d): + for hook_func in hook_funcs: + if d is None: + return + d = hook_func(d) + return d + + +DATA_LOAD_HOOK = chain_hook() +CONF_LOAD_HOOK = chain_hook() +DSL_LOAD_HOOK = chain_hook() + + +class Data(object): + def __init__(self, config: dict, role_str: str): + self.config = config + self.file = config.get("file", "") + self.meta = config.get("meta", {}) + self.partitions = config.get("partitions", 4) + self.head = config.get("head", True) + self.extend_sid = config.get("extend_sid", True) + self.namespace = config.get("namespace", "") + self.table_name = config.get("table_name", "") + self.role_str = role_str + + @staticmethod + def load(config, path: Path): + kwargs = {} + for field_name in config.keys(): + if field_name not in ["file", "role"]: + kwargs[field_name] = config[field_name] + # if config.get("engine", {}) != "PATH": + file_path = path.parent.joinpath(config["file"]).resolve() + if not file_path.exists(): + kwargs["file"] = config["file"] + else: + kwargs["file"] = file_path + role_str = config.get("role") if config.get("role") != "guest" else "guest_0" + return Data(config=kwargs, role_str=role_str) + + def update(self, config: Config): + if config.extend_sid is not None: + self.extend_sid = config.extend_sid + """if config.meta is not None: + self.meta.update(config.meta)""" + + +class PipelineJob(object): + def __init__(self, job_name: str, script_path: Path): + self.job_name = job_name + self.script_path = script_path + + +class Testsuite(object): + def __init__( + self, + dataset: typing.List[Data], + # jobs: typing.List[Job], + pipeline_jobs: typing.List[PipelineJob], + path: Path, + ): + self.dataset = dataset + # self.jobs = jobs + self.pipeline_jobs = pipeline_jobs + self.path = path + self.suite_name = Path(self.path).stem + self._final_status: typing.MutableMapping[str, FinalStatus] = {} + """ + self._dependency: typing.MutableMapping[str, typing.List[Job]] = {} + self._ready_jobs = deque() + for job in self.jobs: + for name in job.pre_works: + self._dependency.setdefault(name, []).append(job) + + self._final_status[job.job_name] = FinalStatus(job.job_name) + if job.is_submit_ready(): + self._ready_jobs.appendleft(job)""" + + for job in self.pipeline_jobs: + self._final_status[job.job_name] = FinalStatus(job.job_name) + + @staticmethod + def load(path: Path, provider): + with path.open("r") as f: + # testsuite_config = json.load(f, object_hook=DATA_LOAD_HOOK.hook) + testsuite_config = yaml.safe_load(f) + # testsuite_config = DATA_LOAD_HOOK.hook(testsuite_config) + + dataset = [] + for d in testsuite_config.get("data"): + d = DATA_LOAD_HOOK.hook(d) + """if "use_local_data" not in d: + d.update({"use_local_data": _config.use_local_data})""" + dataset.append(Data.load(d, path)) + + pipeline_jobs = [] + if testsuite_config.get("tasks", None) is not None and provider is not None: + echo.echo('[Warning] Pipeline does not support parameter: provider-> {}'.format(provider)) + for job_name, job_configs in testsuite_config.get("tasks", {}).items(): + script_path = path.parent.joinpath(job_configs["script"]).resolve() + pipeline_jobs.append(PipelineJob(job_name, script_path)) + + testsuite = Testsuite(dataset, pipeline_jobs, path) + return testsuite + + """def jobs_iter(self) -> typing.Generator[Job, None, None]: + while self._ready_jobs: + yield self._ready_jobs.pop()""" + + @staticmethod + def style_table(txt): + colored_txt = txt.replace("success", f"{TxtStyle.TRUE_VAL}success{TxtStyle.END}") + colored_txt = colored_txt.replace("failed", f"{TxtStyle.FALSE_VAL}failed{TxtStyle.END}") + colored_txt = colored_txt.replace("not submitted", f"{TxtStyle.FALSE_VAL}not submitted{TxtStyle.END}") + return colored_txt + + def pretty_final_summary(self, time_consuming, suite_file=None): + """table = prettytable.PrettyTable( + ["job_name", "job_id", "status", "time_consuming", "exception_id", "rest_dependency"] + )""" + table = prettytable.PrettyTable() + table.set_style(prettytable.ORGMODE) + field_names = ["job_name", "job_id", "status", "time_consuming", "exception_id", "rest_dependency"] + table.field_names = field_names + for status in self.get_final_status().values(): + if status.status != "success": + status.suite_file = suite_file + _config.non_success_jobs.append(status) + if status.exception_id != "-": + exception_id_txt = f"{TxtStyle.FALSE_VAL}{status.exception_id}{TxtStyle.END}" + else: + exception_id_txt = f"{TxtStyle.FIELD_VAL}{status.exception_id}{TxtStyle.END}" + table.add_row( + [ + f"{TxtStyle.FIELD_VAL}{status.name}{TxtStyle.END}", + f"{TxtStyle.FIELD_VAL}{status.job_id}{TxtStyle.END}", + self.style_table(status.status), + f"{TxtStyle.FIELD_VAL}{time_consuming.pop(0) if status.job_id != '-' else '-'}{TxtStyle.END}", + f"{exception_id_txt}", + f"{TxtStyle.FIELD_VAL}{','.join(status.rest_dependency)}{TxtStyle.END}", + ] + ) + + return table.get_string(title=f"{TxtStyle.TITLE}Testsuite Summary: {self.suite_name}{TxtStyle.END}") + + """def model_in_dep(self, name): + return name in self._dependency + + def get_dependent_jobs(self, name): + return self._dependency[name] + + def remove_dependency(self, name): + del self._dependency[name] + + def feed_dep_info(self, job, name, model_info=None, table_info=None, cache_info=None, model_loader_info=None): + if model_info is not None: + job.set_pre_work(name, **model_info) + if table_info is not None: + job.set_input_data(table_info["hierarchy"], table_info["table_info"]) + if cache_info is not None: + job.set_input_data(cache_info["hierarchy"], cache_info["cache_info"]) + if model_loader_info is not None: + job.set_input_data(model_loader_info["hierarchy"], model_loader_info["model_loader_info"]) + if name in job.pre_works: + job.pre_works.remove(name) + if job.is_submit_ready(): + self._ready_jobs.appendleft(job) + + def reflash_configs(self, config: Config): + failed = [] + for job in self.jobs: + try: + job.job_conf.update( + config.parties, None, {}, {} + ) + except ValueError as e: + failed.append((job, e)) + return failed + """ + + def update_status( + self, job_name, job_id: str = None, status: str = None, exception_id: str = None + ): + for k, v in locals().items(): + if k != "job_name" and v is not None: + setattr(self._final_status[job_name], k, v) + + def get_final_status(self): + """for name, jobs in self._dependency.items(): + for job in jobs: + self._final_status[job.job_name].rest_dependency.append(name)""" + return self._final_status + + +class FinalStatus(object): + def __init__( + self, + name: str, + job_id: str = "-", + status: str = "not submitted", + exception_id: str = "-", + rest_dependency: typing.List[str] = None, + ): + self.name = name + self.job_id = job_id + self.status = status + self.exception_id = exception_id + self.rest_dependency = rest_dependency or [] + self.suite_file = None + + +class BenchmarkJob(object): + def __init__(self, job_name: str, script_path: Path, conf_path: Path): + self.job_name = job_name + self.script_path = script_path + self.conf_path = conf_path + + +class BenchmarkPair(object): + def __init__( + self, pair_name: str, jobs: typing.List[BenchmarkJob], compare_setting: dict + ): + self.pair_name = pair_name + self.jobs = jobs + self.compare_setting = compare_setting + + +class BenchmarkSuite(object): + def __init__( + self, dataset: typing.List[Data], pairs: typing.List[BenchmarkPair], path: Path + ): + self.dataset = dataset + self.pairs = pairs + self.path = path + + @staticmethod + def load(path: Path): + with path.open("r") as f: + # testsuite_config = json.load(f, object_hook=DATA_JSON_HOOK.hook) + testsuite_config = yaml.safe_load(f) + # testsuite_config = DATA_JSON_HOOK.hook(testsuite_config) + + dataset = [] + for d in testsuite_config.get("data"): + d = DATA_LOAD_HOOK.hook(d) + dataset.append(Data.load(d, path)) + + pairs = [] + for pair_name, pair_configs in testsuite_config.items(): + if pair_name == "data": + continue + jobs = [] + for job_name, job_configs in pair_configs.items(): + if job_name == "compare_setting": + continue + script_path = path.parent.joinpath(job_configs["script"]).resolve() + if job_configs.get("conf"): + conf_path = path.parent.joinpath(job_configs["conf"]).resolve() + else: + conf_path = "" + jobs.append( + BenchmarkJob( + job_name=job_name, script_path=script_path, conf_path=conf_path + ) + ) + compare_setting = pair_configs.get("compare_setting") + if compare_setting and not isinstance(compare_setting, dict): + raise ValueError( + f"expected 'compare_setting' type is dict, received {type(compare_setting)} instead." + ) + pairs.append( + BenchmarkPair( + pair_name=pair_name, jobs=jobs, compare_setting=compare_setting + ) + ) + suite = BenchmarkSuite(dataset=dataset, pairs=pairs, path=path) + return suite + + +class PerformanceSuite(object): + def __init__( + self, dataset: typing.List[Data], pipeline_jobs: typing.List[BenchmarkJob], path: Path + ): + self.dataset = dataset + self.pipeline_jobs = pipeline_jobs + self.path = path + + @staticmethod + def load(path: Path): + with path.open("r") as f: + # testsuite_config = json.load(f, object_hook=DATA_JSON_HOOK.hook) + testsuite_config = yaml.safe_load(f) + # testsuite_config = DATA_JSON_HOOK.hook(testsuite_config) + + dataset = [] + for d in testsuite_config.get("data"): + d = DATA_LOAD_HOOK.hook(d) + dataset.append(Data.load(d, path)) + + pipeline_jobs = [] + for job_name, job_configs in testsuite_config.get("tasks", {}).items(): + script_path = path.parent.joinpath(job_configs["script"]).resolve() + config_path = path.parent.joinpath(job_configs.get("conf", "")).resolve() + pipeline_jobs.append(BenchmarkJob(job_name, script_path, config_path)) + + suite = PerformanceSuite(dataset, pipeline_jobs, path) + return suite + + +def non_success_summary(): + status = {} + for job in _config.non_success_jobs: + if job.status not in status.keys(): + status[job.status] = prettytable.PrettyTable( + ["testsuite_name", "job_name", "job_id", "status", "exception_id", "rest_dependency"] + ) + + status[job.status].add_row( + [ + job.suite_file, + job.name, + job.job_id, + job.status, + job.exception_id, + ",".join(job.rest_dependency), + ] + ) + for k, v in status.items(): + echo.echo("\n" + "#" * 60) + echo.echo(v.get_string(title=f"{k} job record"), fg='red') + + +def _namespace_hook(namespace): + def _hook(d): + if d is None: + return d + if "namespace" in d and namespace: + d["namespace"] = f"{d['namespace']}_{namespace}" + return d + + return _hook + + +def _replace_hook(mapping: dict): + def _hook(d): + for k, v in mapping.items(): + if k in d: + d[k] = v + return d + + return _hook diff --git a/python/fate_test/fate_test_config.yaml b/python/fate_test/fate_test_config.yaml new file mode 100644 index 0000000..1adbc26 --- /dev/null +++ b/python/fate_test/fate_test_config.yaml @@ -0,0 +1,37 @@ +# base dir for data upload conf eg, data_base_dir={FATE} +# also used for accessing local files when running standalone mode +# examples/data/breast_hetero_guest.csv -> $data_base_dir/examples/data/breast_hetero_guest.csv +data_base_dir: path(FATE) + +# directory dedicated to fate_test job file storage, default cache location={FATE}/examples/cache/ +cache_directory: examples/cache/ +# directory stores performance benchmark suites, default location={FATE}/examples/benchmark_performance +performance_template_directory: examples/benchmark_performance/ +# directory stores flow test config, default location={FATE}/examples/flow_test_template/hetero_lr/flow_test_config.yaml +flow_test_config_directory: examples/flow_test_template/hetero_lr/flow_test_config.yaml + +# directory stores testsuite file with min_test data sets to upload, +# default location={FATE}/examples/data/upload_config/min_test_data_testsuite.json +min_test_data_config: examples/data/upload_config/min_test_data_testsuite.json +# directory stores testsuite file with all example data sets to upload, +# default location={FATE}/examples/data/upload_config/all_examples_data_testsuite.json +all_examples_data_config: examples/data/upload_config/all_examples_data_testsuite.json + +# directory where FATE code locates, default installation location={FATE}/fate +# python/ml -> $fate_base/python/ml +fate_base: path(FATE)/fate + +# whether to delete data in suites after all jobs done +clean_data: true + + +# participating parties' id and corresponding flow service ip & port information +parties: + guest: [ 9999 ] + host: [ 10000, 9999 ] + arbiter: [ 10000 ] +services: + - flow_services: + - { address: 127.0.0.1:9380, parties: [ 9999, 10000 ] } + serving_setting: + address: 127.0.0.1:8059 diff --git a/python/fate_test/scripts/__init__.py b/python/fate_test/scripts/__init__.py new file mode 100644 index 0000000..878d3a9 --- /dev/null +++ b/python/fate_test/scripts/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/fate_test/scripts/_options.py b/python/fate_test/scripts/_options.py new file mode 100644 index 0000000..d32e971 --- /dev/null +++ b/python/fate_test/scripts/_options.py @@ -0,0 +1,67 @@ +import time + +import click +from fate_test._config import parse_config, default_config +from fate_test.scripts._utils import _set_namespace + + +class SharedOptions(object): + _options = { + "config": (('-c', '--config'), + dict(type=click.Path(exists=True), help=f"Manual specify config file", default=None), + default_config().__str__()), + "namespace": (('-n', '--namespace'), + dict(type=str, help=f"Manual specify fate_test namespace", default=None), + time.strftime('%Y%m%d%H%M%S')), + "namespace_mangling": (('-nm', '--namespace-mangling',), + dict(type=bool, is_flag=True, help="Mangling data namespace", default=None), + False), + "yes": (('-y', '--yes',), dict(type=bool, is_flag=True, help="Skip double check", default=None), + False), + "extend_sid": (('--extend-sid',), + dict(type=bool, is_flag=True, help="whether to append uuid as sid when uploading data", + default=None), None), + # "auto_increasing_sid": (('--auto_increasing_sid',), + # dict(type=bool, is_flag=True, help="whether to generate sid value starting at 0", + # default=None), None), + # "mode": (('--mode',), dict(type=click.Choice(["cluster", "standalone"]), default="cluster", + # help="job mode, choose from 'cluster' or 'standalone'"), None) + } + + def __init__(self): + self._options_kwargs = {} + + def __getitem__(self, item): + return self._options_kwargs[item] + + def get(self, k, default=None): + v = self._options_kwargs.get(k, default) + if v is None and k in self._options: + v = self._options[k][2] + return v + + def update(self, **kwargs): + for k, v in kwargs.items(): + if v is not None: + self._options_kwargs[k] = v + + def post_process(self): + # add defaults here + for k, v in self._options.items(): + if self._options_kwargs.get(k, None) is None: + self._options_kwargs[k] = v[2] + + # update config + config = parse_config(self._options_kwargs['config']) + self._options_kwargs['config'] = config + + _set_namespace(self._options_kwargs['namespace_mangling'], self._options_kwargs['namespace']) + + @classmethod + def get_shared_options(cls, hidden=False): + def shared_options(f): + for name, option in cls._options.items(): + f = click.option(*option[0], **dict(option[1], hidden=hidden))(f) + return f + + return shared_options diff --git a/python/fate_test/scripts/_utils.py b/python/fate_test/scripts/_utils.py new file mode 100644 index 0000000..d0f9dd9 --- /dev/null +++ b/python/fate_test/scripts/_utils.py @@ -0,0 +1,191 @@ +import glob as glob_ +import importlib +import os +import time +import uuid +from pathlib import Path + +import click +from fate_test._client import Clients +from fate_test._config import Config +from fate_test._flow_client import DataProgress, UploadDataResponse, QueryJobResponse +from fate_test._io import echo, LOGGER, set_logger +from fate_test._parser import Testsuite, BenchmarkSuite, PerformanceSuite, DATA_LOAD_HOOK, CONF_LOAD_HOOK, DSL_LOAD_HOOK + + +def _big_data_task(includes, guest_data_size, host_data_size, guest_feature_num, host_feature_num, host_data_type, + config_inst, encryption_type, match_rate, sparsity, force, split_host, output_path): + from fate_test.scripts import generate_mock_data + + def _find_testsuite_files(path): + suffix = ["testsuite.yaml", "benchmark.yaml", "performance.yaml"] + if isinstance(path, str): + path = Path(path) + if path.is_file(): + if path.name.endswith(suffix[0]) or path.name.endswith(suffix[1]) or path.name.endswith(suffix[2]): + paths = [path] + else: + LOGGER.warning(f"{path} is file, but not end with `{suffix}`, skip") + paths = [] + return [p.resolve() for p in paths] + else: + os.path.abspath(path) + paths = glob_.glob(f"{path}/*{suffix[0]}") + glob_.glob(f"{path}/*{suffix[1]}") + return [Path(p) for p in paths] + + for include in includes: + if isinstance(include, str): + include_paths = Path(include) + include_paths = _find_testsuite_files(include_paths) + for include_path in include_paths: + generate_mock_data.get_big_data(guest_data_size, host_data_size, guest_feature_num, host_feature_num, + include_path, host_data_type, config_inst, encryption_type, + match_rate, sparsity, force, split_host, output_path) + + +def _load_testsuites(includes, excludes, glob, provider=None, suffix="testsuite.yaml", suite_type="testsuite"): + def _find_testsuite_files(path): + if isinstance(path, str): + path = Path(path) + if path.is_file(): + if path.name.endswith(suffix): + paths = [path] + else: + LOGGER.warning(f"{path} is file, but not end with `{suffix}`, skip") + paths = [] + else: + paths = path.glob(f"**/*{suffix}") + return [p.resolve() for p in paths] + + excludes_set = set() + for exclude in excludes: + excludes_set.update(_find_testsuite_files(exclude)) + + suite_paths = set() + for include in includes: + if isinstance(include, str): + include = Path(include) + + # glob + if glob is not None and include.is_dir(): + include_list = include.glob(glob) + else: + include_list = [include] + for include_path in include_list: + for suite_path in _find_testsuite_files(include_path): + if suite_path not in excludes_set: + suite_paths.add(suite_path) + suites = [] + for suite_path in suite_paths: + try: + if suite_type == "testsuite": + suite = Testsuite.load(suite_path.resolve(), provider) + elif suite_type == "benchmark": + suite = BenchmarkSuite.load(suite_path.resolve()) + elif suite_type == "performance": + suite = PerformanceSuite.load(suite_path.resolve()) + else: + raise ValueError(f"Unsupported suite type: {suite_type}. Only accept type 'testsuite' or 'benchmark'.") + except Exception as e: + echo.stdout(f"load suite {suite_path} failed: {e}") + else: + suites.append(suite) + return suites + + +@LOGGER.catch +def _upload_data(clients: Clients, suite, config: Config, output_path=None): + with click.progressbar(length=len(suite.dataset), + label="dataset", + show_eta=False, + show_pos=True, + width=24) as bar: + for i, data in enumerate(suite.dataset): + data.update(config) + data_progress = DataProgress(f"{data.role_str}<-{data.namespace}.{data.table_name}") + + def update_bar(n_step): + bar.item_show_func = lambda x: data_progress.show() + time.sleep(0.1) + bar.update(n_step) + + def _call_back(resp): + if isinstance(resp, UploadDataResponse): + data_progress.submitted(resp.job_id) + echo.file(f"[dataset]{resp.job_id}") + if isinstance(resp, QueryJobResponse): + data_progress.update() + update_bar(0) + + try: + echo.stdout_newline() + # role, idx = data.role_str.lower().split("_") + # party_id = config.role[role][int(idx)] + status = clients[data.role_str].transform_local_file_to_dataframe(data, + _call_back, + output_path) + time.sleep(1) + data_progress.update() + if status != 'success': + raise RuntimeError(f"uploading {i + 1}th data for {suite.path} {status}") + bar.update(1) + + """if _config.data_switch: + from fate_test.scripts import generate_mock_data + + generate_mock_data.remove_file(data_path)""" + except Exception: + exception_id = str(uuid.uuid1()) + echo.file(f"exception({exception_id})") + LOGGER.exception(f"exception id: {exception_id}") + echo.echo(f"upload {i + 1}th data {data.config} to {data.role_str} fail, exception_id: {exception_id}") + # raise RuntimeError(f"exception uploading {i + 1}th data") from e + + +def _delete_data(clients: Clients, suite: Testsuite): + with click.progressbar(length=len(suite.dataset), + label="delete ", + show_eta=False, + show_pos=True, + width=24) as bar: + for data in suite.dataset: + # noinspection PyBroadException + try: + table_name = data.config['table_name'] if data.config.get( + 'table_name', None) is not None else data.config.get('name') + bar.item_show_func = \ + lambda x: f"delete table: name={table_name}, namespace={data.config['namespace']}" + clients[data.role_str].delete_data(data) + except Exception: + LOGGER.exception( + f"delete failed: name={table_name}, namespace={data.config['namespace']}") + + time.sleep(0.5) + bar.update(1) + echo.stdout_newline() + + +def _load_module_from_script(script_path): + module_name = str(script_path).split("/", -1)[-1].split(".")[0] + loader = importlib.machinery.SourceFileLoader(module_name, str(script_path)) + spec = importlib.util.spec_from_loader(loader.name, loader) + mod = importlib.util.module_from_spec(spec) + loader.exec_module(mod) + return mod + + +def _set_namespace(data_namespace_mangling, namespace): + Path(f"logs/{namespace}").mkdir(exist_ok=True, parents=True) + set_logger(f"logs/{namespace}/exception.log") + echo.set_file(click.open_file(f'logs/{namespace}/stdout', "a")) + + if data_namespace_mangling: + echo.echo(f"add data_namespace_mangling: _{namespace}") + DATA_LOAD_HOOK.add_extend_namespace_hook(namespace) + CONF_LOAD_HOOK.add_extend_namespace_hook(namespace) + + +def _add_replace_hook(replace): + DATA_LOAD_HOOK.add_replace_hook(replace) + CONF_LOAD_HOOK.add_replace_hook(replace) + DSL_LOAD_HOOK.add_replace_hook(replace) diff --git a/python/fate_test/scripts/benchmark_cli.py b/python/fate_test/scripts/benchmark_cli.py new file mode 100644 index 0000000..365484a --- /dev/null +++ b/python/fate_test/scripts/benchmark_cli.py @@ -0,0 +1,163 @@ +import os +import re +import time +import uuid +from datetime import timedelta +from inspect import signature + +import click +from fate_test._client import Clients +from fate_test._config import Config +from fate_test._io import LOGGER, echo +from fate_test._parser import BenchmarkSuite +from fate_test.scripts._options import SharedOptions +from fate_test.scripts._utils import _upload_data, _delete_data, _load_testsuites, _load_module_from_script +from fate_test.utils import show_data, match_metrics + +DATA_DISPLAY_PATTERN = re.compile("^FATE") + + +@click.command(name="benchmark-quality") +@click.option('-i', '--include', required=True, type=click.Path(exists=True), multiple=True, metavar="", + help="include *benchmark.yaml under these paths") +@click.option('-e', '--exclude', type=click.Path(exists=True), multiple=True, + help="exclude *benchmark.yaml under these paths") +@click.option('-p', '--task-cores', type=int, help="processors per node", default=None) +@click.option('-m', '--timeout', type=int, default=None, + help="maximum running time of job") +@click.option('-g', '--glob', type=str, + help="glob string to filter sub-directory of path specified by ") +@click.option('-t', '--tol', type=float, + help="tolerance (absolute error) for metrics to be considered almost equal. " + "Comparison is done by evaluating abs(a-b) <= max(relative_tol * max(abs(a), abs(b)), absolute_tol)") +@click.option('-s', '--storage-tag', type=str, + help="tag for storing metrics, for future metrics info comparison") +@click.option('-v', '--history-tag', type=str, multiple=True, + help="Extract metrics info from history tags for comparison") +@click.option('-d', '--match-details', type=click.Choice(['all', 'relative', 'absolute', 'none']), + default="all", help="Error value display in algorithm comparison") +@click.option('--skip-data', is_flag=True, default=False, + help="skip uploading data specified in benchmark conf") +@click.option("--data-only", is_flag=True, default=False, + help="upload data only") +@click.option("--disable-clean-data", "clean_data", flag_value=False, default=None) +@click.option("--enable-clean-data", "clean_data", flag_value=True, default=None) +@SharedOptions.get_shared_options(hidden=True) +@click.pass_context +def run_benchmark(ctx, include, exclude, glob, skip_data, tol, clean_data, storage_tag, history_tag, match_details, + task_cores, timeout, **kwargs): + """ + process benchmark suite, alias: bq + """ + ctx.obj.update(**kwargs) + ctx.obj.post_process() + namespace = ctx.obj["namespace"] + config_inst = ctx.obj["config"] + if ctx.obj["extend_sid"] is not None: + config_inst.extend_sid = ctx.obj["extend_sid"] + if task_cores is not None: + config_inst.update_conf(task_cores=task_cores) + if timeout is not None: + config_inst.update_conf(timeout=timeout) + + """if ctx.obj["auto_increasing_sid"] is not None: + config_inst.auto_increasing_sid = ctx.obj["auto_increasing_sid"]""" + if clean_data is None: + clean_data = config_inst.clean_data + data_namespace_mangling = ctx.obj["namespace_mangling"] + yes = ctx.obj["yes"] + + echo.welcome("benchmark") + echo.echo(f"testsuite namespace: {namespace}", fg='red') + echo.echo("loading testsuites:") + suites = _load_testsuites(includes=include, excludes=exclude, glob=glob, + suffix="benchmark.yaml", suite_type="benchmark") + for suite in suites: + echo.echo(f"\tdataset({len(suite.dataset)}) benchmark groups({len(suite.pairs)}) {suite.path}") + if not yes and not click.confirm("running?"): + return + client = Clients(config_inst) + fate_version = client["guest_0"].get_version() + for i, suite in enumerate(suites): + # noinspection PyBroadException + try: + start = time.time() + echo.echo(f"[{i + 1}/{len(suites)}]start at {time.strftime('%Y-%m-%d %X')} {suite.path}", fg='red') + if not skip_data: + try: + _upload_data(client, suite, config_inst) + except Exception as e: + raise RuntimeError(f"exception occur while uploading data for {suite.path}") from e + if kwargs.get("data_only"): + continue + try: + _run_benchmark_pairs(config_inst, suite, tol, namespace, data_namespace_mangling, storage_tag, + history_tag, fate_version, match_details) + except Exception as e: + raise RuntimeError(f"exception occur while running benchmark jobs for {suite.path}") from e + + if not skip_data and clean_data: + _delete_data(client, suite) + echo.echo(f"[{i + 1}/{len(suites)}]elapse {timedelta(seconds=int(time.time() - start))}", fg='red') + + except Exception: + exception_id = uuid.uuid1() + echo.echo(f"exception in {suite.path}, exception_id={exception_id}", err=True, fg='red') + LOGGER.exception(f"exception id: {exception_id}") + finally: + echo.stdout_newline() + echo.farewell() + echo.echo(f"testsuite namespace: {namespace}", fg='red') + + +@LOGGER.catch +def _run_benchmark_pairs(config: Config, suite: BenchmarkSuite, tol: float, namespace: str, + data_namespace_mangling: bool, storage_tag, history_tag, fate_version, match_details): + # pipeline demo goes here + pair_n = len(suite.pairs) + fate_base = config.fate_base + PYTHONPATH = os.environ.get('PYTHONPATH') + ":" + os.path.join(fate_base, "python") + os.environ['PYTHONPATH'] = PYTHONPATH + for i, pair in enumerate(suite.pairs): + echo.echo(f"Running [{i + 1}/{pair_n}] group: {pair.pair_name}") + results = {} + # data_summary = None + job_n = len(pair.jobs) + for j, job in enumerate(pair.jobs): + try: + echo.echo(f"Running [{j + 1}/{job_n}] job: {job.job_name}") + job_name, script_path, conf_path = job.job_name, job.script_path, job.conf_path + param = Config.load_from_file(conf_path) + mod = _load_module_from_script(script_path) + input_params = signature(mod.main).parameters + # local script + if len(input_params) == 1: + data, metric = mod.main(param=param) + elif len(input_params) == 2: + data, metric = mod.main(config=config, param=param) + # pipeline script + elif len(input_params) == 3: + if data_namespace_mangling: + data, metric = mod.main(config=config, param=param, namespace=f"_{namespace}") + else: + data, metric = mod.main(config=config, param=param) + else: + data, metric = mod.main() + results[job_name] = metric + echo.echo(f"[{j + 1}/{job_n}] job: {job.job_name} Success!\n") + if data and DATA_DISPLAY_PATTERN.match(job_name): + # data_summary = data + show_data(data) + # if data_summary is None: + # data_summary = data + except Exception as e: + exception_id = uuid.uuid1() + echo.echo(f"exception while running [{j + 1}/{job_n}] job, exception_id={exception_id}", err=True, + fg='red') + LOGGER.exception(f"exception id: {exception_id}, error message: \n{e}") + continue + rel_tol = pair.compare_setting.get("relative_tol") + # show_data(data_summary) + match_metrics(evaluate=True, group_name=pair.pair_name, abs_tol=tol, rel_tol=rel_tol, + storage_tag=storage_tag, history_tag=history_tag, fate_version=fate_version, + cache_directory=config.cache_directory, match_details=match_details, **results) diff --git a/python/fate_test/scripts/cli.py b/python/fate_test/scripts/cli.py new file mode 100644 index 0000000..f59bd6c --- /dev/null +++ b/python/fate_test/scripts/cli.py @@ -0,0 +1,67 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import click + +from fate_test.scripts._options import SharedOptions +from fate_test.scripts.benchmark_cli import run_benchmark +from fate_test.scripts.config_cli import config_group +from fate_test.scripts.data_cli import data_group +# from fate_test.scripts.flow_test_cli import flow_group +from fate_test.scripts.performance_cli import run_task +# from fate_test.scripts.quick_test_cli import unittest_group +# from fate_test.scripts.secure_protocol_cli import secure_protocol_group +from fate_test.scripts.testsuite_cli import run_suite + +commands = { + "config": config_group, + "suite": run_suite, + "performance": run_task, + "benchmark-quality": run_benchmark, + "data": data_group, + # "unittest": unittest_group +} + +commands_alias = { + "bq": "benchmark-quality", + "bp": "performance" +} + + +class MultiCLI(click.MultiCommand): + + def list_commands(self, ctx): + return list(commands) + + def get_command(self, ctx, name): + if name not in commands and name in commands_alias: + name = commands_alias[name] + if name not in commands: + ctx.fail("No such command '{}'.".format(name)) + return commands[name] + + +@click.command(cls=MultiCLI, help="A collection of useful tools to running FATE's test.", + context_settings=dict(help_option_names=["-h", "--help"])) +@SharedOptions.get_shared_options() +@click.pass_context +def cli(ctx, **kwargs): + ctx.ensure_object(SharedOptions) + ctx.obj.update(**kwargs) + + +if __name__ == '__main__': + cli(obj=SharedOptions()) diff --git a/python/fate_test/scripts/config_cli.py b/python/fate_test/scripts/config_cli.py new file mode 100644 index 0000000..55f0b4c --- /dev/null +++ b/python/fate_test/scripts/config_cli.py @@ -0,0 +1,79 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pathlib import Path + +import click +from fate_test._client import Clients +from fate_test._config import create_config, default_config, parse_config +from fate_test.scripts._options import SharedOptions + + +@click.group("config", help="fate_test config") +def config_group(): + """ + config fate_test + """ + pass + + +@config_group.command(name="new") +def _new(): + """ + create new fate_test config temperate + """ + create_config(Path("fate_test_config.yaml")) + click.echo(f"create config file: fate_test_config.yaml") + + +@config_group.command(name="edit") +@SharedOptions.get_shared_options(hidden=True) +@click.pass_context +def _edit(ctx, **kwargs): + """ + edit fate_test config file + """ + ctx.obj.update(**kwargs) + config = ctx.obj.get("config") + click.edit(filename=config) + + +@config_group.command(name="show") +def _show(): + """ + show fate_test default config path + """ + click.echo(f"default config path is {default_config()}") + + +@config_group.command(name="check") +@SharedOptions.get_shared_options(hidden=True) +@click.pass_context +def _config(ctx, **kwargs): + """ + check connection + """ + ctx.obj.update(**kwargs) + config_inst = parse_config(ctx.obj.get("config")) + with Clients(config_inst) as clients: + roles = clients.all_roles() + for r in roles: + try: + version, address = clients[r].check_connection() + except Exception as e: + click.echo(f"[X]connection fail, role is {r}, exception is {e.args}") + else: + click.echo(f"[✓]connection {address} ok, fate version is {version}, role is {r}") diff --git a/python/fate_test/scripts/data_cli.py b/python/fate_test/scripts/data_cli.py new file mode 100644 index 0000000..b50b002 --- /dev/null +++ b/python/fate_test/scripts/data_cli.py @@ -0,0 +1,422 @@ +import os +import re +import sys +import time +import uuid +from datetime import timedelta + +import click +from fate_test._client import Clients +from fate_test._config import Config +from fate_test._io import LOGGER, echo +from fate_test.scripts._options import SharedOptions +from fate_test.scripts._utils import _load_testsuites, _delete_data, _big_data_task, _upload_data +from ruamel import yaml + + +@click.group(name="data") +def data_group(): + """ + upload or delete data in suite config files + """ + ... + + +@data_group.command("upload") +@click.option('-i', '--include', required=False, type=click.Path(exists=True), multiple=True, metavar="", + help="include *benchmark.yaml under these paths") +@click.option('-e', '--exclude', type=click.Path(exists=True), multiple=True, + help="exclude *benchmark.yaml under these paths") +@click.option("-t", "--config-type", type=click.Choice(["min_test", "all_examples"]), default="min_test", + help="config file") +@click.option('-g', '--glob', type=str, + help="glob string to filter sub-directory of path specified by ") +@click.option('-s', '--suite-type', required=False, type=click.Choice(["testsuite", "benchmark", "performance"]), + default="testsuite", + help="suite type") +@click.option('-r', '--role', type=str, default='all', help="role to process, default to `all`. " + "use option likes: `guest_0`, `host_0`, `host`") +@SharedOptions.get_shared_options(hidden=True) +@click.pass_context +def upload(ctx, include, exclude, glob, suite_type, role, config_type, **kwargs): + """ + upload data defined in suite config files + """ + ctx.obj.update(**kwargs) + ctx.obj.post_process() + namespace = ctx.obj["namespace"] + config_inst = ctx.obj["config"] + if ctx.obj["extend_sid"] is not None: + config_inst.extend_sid = ctx.obj["extend_sid"] + """if ctx.obj["auto_increasing_sid"] is not None: + config_inst.auto_increasing_sid = ctx.obj["auto_increasing_sid"]""" + yes = ctx.obj["yes"] + echo.welcome() + echo.echo(f"testsuite namespace: {namespace}", fg='red') + client = Clients(config_inst) + if len(include) != 0: + echo.echo("loading testsuites:") + if suite_type == "benchmark": + suffix = "benchmark.yaml" + elif suite_type == "testsuite": + suffix = "testsuite.yaml" + elif suite_type == "performance": + suffix = "performance.yaml" + else: + raise ValueError(f"unknown suite type: {suite_type}") + suites = _load_testsuites(includes=include, excludes=exclude, glob=glob, + suffix=suffix, suite_type=suite_type) + for suite in suites: + if role != "all": + suite.dataset = [d for d in suite.dataset if re.match(d.role_str, role)] + echo.echo(f"\tdataset({len(suite.dataset)}) {suite.path}") + if not yes and not click.confirm("running?"): + return + + for suite in suites: + _upload_data(client, suite, config_inst) + else: + config = get_config(config_inst) + if config_type == 'min_test': + config_file = config.min_test_data_config + else: + config_file = config.all_examples_data_config + + with open(config_file, 'r', encoding='utf-8') as f: + upload_data = yaml.safe_load(f.read()) + + echo.echo(f"\tdataset({len(upload_data['data'])}) {config_file}") + if not yes and not click.confirm("running?"): + return + _upload_data(client, upload_data, config_inst) + echo.farewell() + echo.echo(f"testsuite namespace: {namespace}", fg='red') + + +@data_group.command("delete") +@click.option('-i', '--include', required=True, type=click.Path(exists=True), multiple=True, metavar="", + help="include *benchmark.yaml under these paths") +@click.option('-e', '--exclude', type=click.Path(exists=True), multiple=True, + help="exclude *benchmark.yaml under these paths") +@click.option('-g', '--glob', type=str, + help="glob string to filter sub-directory of path specified by ") +@click.option('-s', '--suite-type', required=True, type=click.Choice(["testsuite", "benchmark"]), help="suite type") +@SharedOptions.get_shared_options(hidden=True) +@click.pass_context +def delete(ctx, include, exclude, glob, yes, suite_type, **kwargs): + """ + delete data defined in suite config files + """ + ctx.obj.update(**kwargs) + ctx.obj.post_process() + namespace = ctx.obj["namespace"] + config_inst = ctx.obj["config"] + echo.welcome() + echo.echo(f"testsuite namespace: {namespace}", fg='red') + echo.echo("loading testsuites:") + suffix = "benchmark.yaml" if suite_type == "benchmark" else "testsuite.yaml" + + suites = _load_testsuites(includes=include, excludes=exclude, glob=glob, + suffix=suffix, suite_type=suite_type) + if not yes and not click.confirm("running?"): + return + + for suite in suites: + echo.echo(f"\tdataset({len(suite.dataset)}) {suite.path}") + if not yes and not click.confirm("running?"): + return + client = Clients(config_inst) + for i, suite in enumerate(suites): + _delete_data(client, suite) + echo.farewell() + echo.echo(f"testsuite namespace: {namespace}", fg='red') + + +@data_group.command("generate") +@click.option('-i', '--include', required=True, type=click.Path(exists=True), multiple=True, metavar="", + help="include *testsuite.yaml / *benchmark.yaml under these paths") +@click.option('-ht', '--host-data-type', default='tag_value', type=click.Choice(['dense', 'tag', 'tag_value']), + help="Select the format of the host data") +@click.option('-p', '--encryption-type', type=click.Choice(['sha256', 'md5']), + help="ID encryption method, choose between sha256 and md5") +@click.option('-m', '--match-rate', default=1.0, type=float, + help="Intersection rate relative to guest, between [0, 1]") +@click.option('-s', '--sparsity', default=0.2, type=float, + help="The sparsity of tag data, The value is between (0-1)") +@click.option('-ng', '--guest-data-size', type=int, default=10000, + help="Set guest data set size, not less than 100") +@click.option('-nh', '--host-data-size', type=int, + help="Set host data set size, not less than 100") +@click.option('-fg', '--guest-feature-num', type=int, default=20, + help="Set guest feature dimensions") +@click.option('-fh', '--host-feature-num', type=int, default=200, + help="Set host feature dimensions; the default is equal to the number of guest's size") +@click.option('-o', '--output-path', type=click.Path(exists=True), + help="Customize the output path of generated data") +@click.option('--force', is_flag=True, default=False, + help="Overwrite existing file") +@click.option('--split-host', is_flag=True, default=False, + help="Divide the amount of host data equally among all the host tables in TestSuite") +@click.option('--upload-data', is_flag=True, default=False, + help="Generated data will be uploaded") +@click.option('--remove-data', is_flag=True, default=False, + help="The generated data will be deleted") +@click.option('--use-local-data', is_flag=True, default=False, + help="The existing data of the server will be uploaded, This parameter is not recommended for " + "distributed applications") +# @click.option('--parallelize', is_flag=True, default=False, +# help="It is directly used to upload data, and will not generate data") +@SharedOptions.get_shared_options(hidden=True) +@click.pass_context +def generate(ctx, include, host_data_type, encryption_type, match_rate, sparsity, guest_data_size, + host_data_size, guest_feature_num, host_feature_num, output_path, force, split_host, upload_data, + **kwargs): + """ + create data defined in suite config files + """ + ctx.obj.update(**kwargs) + + ctx.obj.post_process() + namespace = ctx.obj["namespace"] + config_inst = ctx.obj["config"] + if ctx.obj["extend_sid"] is not None: + config_inst.extend_sid = ctx.obj["extend_sid"] + """if ctx.obj["auto_increasing_sid"] is not None: + config_inst.auto_increasing_sid = ctx.obj["auto_increasing_sid"] + if parallelize and upload_data: + upload_data = False + """ + yes = ctx.obj["yes"] + echo.welcome() + echo.echo(f"testsuite namespace: {namespace}", fg='red') + echo.echo("loading testsuites:") + if host_data_size is None: + host_data_size = guest_data_size + suites = _load_testsuites(includes=include, excludes=tuple(), glob=None) + suites += _load_testsuites(includes=include, excludes=tuple(), glob=None, + suffix="benchmark.yaml", suite_type="benchmark") + suites += _load_testsuites(includes=include, excludes=tuple(), glob=None, + suffix="performance.yaml", suite_type="performance") + for suite in suites: + if upload_data: + echo.echo(f"\tdataget({len(suite.dataset)}) dataset({len(suite.dataset)}) {suite.path}") + else: + echo.echo(f"\tdataget({len(suite.dataset)}) {suite.path}") + if not yes and not click.confirm("running?"): + return + + _big_data_task(include, guest_data_size, host_data_size, guest_feature_num, host_feature_num, host_data_type, + config_inst, encryption_type, match_rate, sparsity, force, split_host, output_path) + if upload_data: + """if use_local_data: + _config.use_local_data = 0 + _config.data_switch = remove_data""" + client = Clients(config_inst) + for suite in suites: + _upload_data(client, suite, config_inst) + + +"""@data_group.command("download") +@click.option("-t", "--type", type=click.Choice(["mnist"]), default="mnist", + help="config file") +@click.option('-o', '--output-path', type=click.Path(exists=True), + help="output path of mnist data, the default path is examples/data") +@SharedOptions.get_shared_options(hidden=True) +@click.pass_context +def download_mnists(ctx, output_path, **kwargs): + ctx.obj.update(**kwargs) + ctx.obj.post_process() + namespace = ctx.obj["namespace"] + config_inst = ctx.obj["config"] + yes = ctx.obj["yes"] + echo.welcome() + echo.echo(f"testsuite namespace: {namespace}", fg='red') + + if output_path is None: + config = get_config(config_inst) + output_path = str(config.data_base_dir) + "/examples/data/" + if not yes and not click.confirm("running?"): + return + try: + download_mnist(Path(output_path), "mnist_train") + download_mnist(Path(output_path), "mnist_eval", is_train=False) + except Exception: + exception_id = uuid.uuid1() + echo.echo(f"exception_id={exception_id}") + LOGGER.exception(f"exception id: {exception_id}") + finally: + echo.stdout_newline() + echo.farewell() + echo.echo(f"testsuite namespace: {namespace}", fg='red')""" + + +@data_group.command("query_schema") +@click.option('-cpn', '--component-name', required=True, type=str, help="component name(task name)") +@click.option('-j', '--job-id', required=True, type=str, help="job id") +@click.option('-r', '--role', required=True, type=click.Choice(["guest", "host", "arbiter"]), help="role") +@click.option('-p', '--party-id', required=True, type=str, help="party id") +@click.option('-dn', '--output-data-name', required=True, type=str, help="output data name, e.g. 'train_output_data'") +@SharedOptions.get_shared_options(hidden=True) +@click.pass_context +def query_schema(ctx, component_name, job_id, role, party_id, output_data_name, **kwargs): + """ + query the meta of the output data of a component + """ + ctx.obj.update(**kwargs) + ctx.obj.post_process() + config_inst = ctx.obj["config"] + namespace = ctx.obj["namespace"] + echo.echo(f"testsuite namespace: {namespace}", fg='red') + """ + yes = ctx.obj["yes"] + echo.welcome() + + if not yes and not click.confirm("running?"): + return""" + client = Clients(config_inst) + query_component_output_data(client, config_inst, component_name, job_id, role, party_id, output_data_name) + # echo.farewell() + # echo.echo(f"testsuite namespace: {namespace}", fg='red') + + +def get_config(conf: Config): + return conf + + +def query_component_output_data(clients, config: Config, component_name, job_id, role, party_id, output_data_name): + roles = config.role + clients_role = None + for k, v in roles.items(): + if int(party_id) in v and k == role: + clients_role = role + "_" + str(v.index(int(party_id))) + try: + if clients_role is None: + raise ValueError(f"party id {party_id} does not exist") + + try: + table_info = clients[clients_role].output_data_table(job_id=job_id, role=role, party_id=party_id, + task_name=component_name, + output_data_name=output_data_name) + table_info = clients[clients_role].table_query(table_name=table_info['name'], + namespace=table_info['namespace']) + except Exception as e: + raise RuntimeError(f"An exception occurred while getting data {clients_role}<-{component_name}") from e + + echo.echo("query_component_output_data result: {}".format(table_info)) + try: + header = table_info['data']['schema']['header'] + except ValueError as e: + raise ValueError(f"Obtain header from table error, error msg: {e}") + + result = [] + for idx, header_name in enumerate(header[1:]): + result.append((idx, header_name)) + echo.echo("Queried header is {}".format(result)) + except Exception: + exception_id = uuid.uuid1() + echo.echo(f"exception_id={exception_id}") + LOGGER.exception(f"exception id: {exception_id}") + finally: + echo.stdout_newline() + + +def download_mnist(base, name, is_train=True): + import torchvision + + dataset = torchvision.datasets.MNIST( + root=base.joinpath(".cache"), train=is_train, download=True + ) + converted_path = base.joinpath(name) + converted_path.mkdir(exist_ok=True) + + inputs_path = converted_path.joinpath("images") + inputs_path.mkdir(exist_ok=True) + targets_path = converted_path.joinpath("targets") + config_path = converted_path.joinpath("config.yaml") + filenames_path = converted_path.joinpath("filenames") + + with filenames_path.open("w") as filenames: + with targets_path.open("w") as targets: + for idx, (img, target) in enumerate(dataset): + filename = f"{idx:05d}" + # save img + img.save(inputs_path.joinpath(f"{filename}.jpg")) + # save target + targets.write(f"{filename},{target}\n") + # save filenames + filenames.write(f"{filename}\n") + + config = { + "type": "vision", + "inputs": {"type": "images", "ext": "jpg", "PIL_mode": "L"}, + "targets": {"type": "integer"}, + } + with config_path.open("w") as f: + yaml.safe_dump(config, f, indent=2, default_flow_style=False) + + +def data_upload(clients, conf: Config, upload_config): + def _await_finish(job_id, task_name=None): + deadline = time.time() + sys.maxsize + start = time.time() + param = dict( + job_id=job_id, + role=None + ) + while True: + stdout = clients["guest_0"].flow_client("job/query", param) + status = stdout["data"][0]["f_status"] + elapse_seconds = int(time.time() - start) + date = time.strftime('%Y-%m-%d %X') + if task_name: + log_msg = f"[{date}][{task_name}]{status}, elapse: {timedelta(seconds=elapse_seconds)}" + else: + log_msg = f"[{date}]{job_id} {status}, elapse: {timedelta(seconds=elapse_seconds)}" + if (status == "running" or status == "waiting") and time.time() < deadline: + print(log_msg, end="\r") + time.sleep(1) + continue + else: + print(" " * 60, end="\r") # clean line + echo.echo(log_msg) + return status + + task_data = upload_config["data"] + for i, data in enumerate(task_data): + format_msg = f"@{data['file']} >> {data['namespace']}.{data['table_name']}" + echo.echo(f"[{time.strftime('%Y-%m-%d %X')}]uploading {format_msg}") + try: + data["file"] = str(os.path.join(conf.data_base_dir, data["file"])) + param = dict( + file=data["file"], + head=data["head"], + partition=data["partition"], + table_name=data["table_name"], + namespace=data["namespace"] + ) + stdout = clients["guest_0"].flow_client("data/upload", param, drop=1) + job_id = stdout.get('jobId', None) + echo.echo(f"[{time.strftime('%Y-%m-%d %X')}]upload done {format_msg}, job_id={job_id}\n") + if job_id is None: + echo.echo("table already exist. To upload again, Please add '-f 1' in start cmd") + continue + _await_finish(job_id) + param = dict( + table_name=data["table_name"], + namespace=data["namespace"] + ) + stdout = clients["guest_0"].flow_client("table/info", param) + + count = stdout["data"]["count"] + if count != data["count"]: + raise AssertionError("Count of upload file is not as expect, count is: {}," + "expect is: {}".format(count, data["count"])) + echo.echo(f"[{time.strftime('%Y-%m-%d %X')}] check_data_out {stdout} \n") + except Exception as e: + exception_id = uuid.uuid1() + echo.echo(f"exception in {data['file']}, exception_id={exception_id}") + LOGGER.exception(f"exception id: {exception_id}") + echo.echo(f"upload {i + 1}th data {data['table_name']} fail, exception_id: {exception_id}") + # raise RuntimeError(f"exception occur while uploading data for {data['file']}") from e + finally: + echo.stdout_newline() diff --git a/python/fate_test/scripts/generate_mock_data.py b/python/fate_test/scripts/generate_mock_data.py new file mode 100644 index 0000000..c073e0c --- /dev/null +++ b/python/fate_test/scripts/generate_mock_data.py @@ -0,0 +1,249 @@ +import hashlib +import os +import random +import sys +import threading +import time +import uuid + +import numpy as np +import pandas as pd +from fate_test._config import Config +from fate_test._io import echo, LOGGER +from ruamel import yaml + +sys.setrecursionlimit(1000000) + + +class data_progress: + def __init__(self, down_load, time_start): + self.time_start = time_start + self.down_load = down_load + self.time_percent = 0 + self.switch = True + + def set_switch(self, switch): + self.switch = switch + + def get_switch(self): + return self.switch + + def set_time_percent(self, time_percent): + self.time_percent = time_percent + + def get_time_percent(self): + return self.time_percent + + def progress(self, percent): + if percent > 100: + percent = 100 + end = time.time() + if percent != 100: + print(f"\r{self.down_load} %.f%s [%s] running" % (percent, '%', self.timer(end - self.time_start)), + flush=True, end='') + else: + print(f"\r{self.down_load} %.f%s [%s] success" % (percent, '%', self.timer(end - self.time_start)), + flush=True, end='') + + @staticmethod + def timer(times): + hours, rem = divmod(times, 3600) + minutes, seconds = divmod(rem, 60) + return "{:0>2}:{:0>2}:{:0>2}".format(int(hours), int(minutes), int(seconds)) + + +def remove_file(path): + os.remove(path) + + +def id_encryption(encryption_type, start_num, end_num): + if encryption_type == 'md5': + return [hashlib.md5(bytes(str(value), encoding='utf-8')).hexdigest() for value in range(start_num, end_num)] + elif encryption_type == 'sha256': + return [hashlib.sha256(bytes(str(value), encoding='utf-8')).hexdigest() for value in range(start_num, end_num)] + else: + return [str(value) for value in range(start_num, end_num)] + + +def get_big_data(guest_data_size, host_data_size, guest_feature_num, host_feature_num, include_path, host_data_type, + conf: Config, encryption_type, match_rate, sparsity, force, split_host, output_path): + global big_data_dir + + def list_tag_value(feature_nums, head): + # data = '' + # for f in range(feature_nums): + # data += head[f] + ':' + str(round(np.random.randn(), 4)) + ";" + # return data[:-1] + return ";".join([head[k] + ':' + str(round(v, 4)) for k, v in enumerate(np.random.randn(feature_nums))]) + + def list_tag(feature_nums, data_list): + data = '' + for f in range(feature_nums): + data += random.choice(data_list) + ";" + return data[:-1] + + def _generate_tag_value_data(data_path, start_num, end_num, feature_nums, progress): + data_num = end_num - start_num + section_data_size = round(data_num / 100) + iteration = round(data_num / section_data_size) + head = ['x' + str(i) for i in range(feature_nums)] + for batch in range(iteration + 1): + progress.set_time_percent(batch) + output_data = pd.DataFrame(columns=["id"]) + if section_data_size * (batch + 1) <= data_num: + output_data["id"] = id_encryption(encryption_type, section_data_size * batch + start_num, + section_data_size * (batch + 1) + start_num) + slicing_data_size = section_data_size + elif section_data_size * batch < data_num: + output_data['id'] = id_encryption(encryption_type, section_data_size * batch + start_num, end_num) + slicing_data_size = data_num - section_data_size * batch + else: + break + feature = [list_tag_value(feature_nums, head) for i in range(slicing_data_size)] + output_data['feature'] = feature + output_data.to_csv(data_path, mode='a+', index=False, header=False) + + def _generate_dens_data(data_path, start_num, end_num, feature_nums, label_flag, progress): + if label_flag: + head_1 = ['id', 'y'] + else: + head_1 = ['id'] + data_num = end_num - start_num + head_2 = ['x' + str(i) for i in range(feature_nums)] + df_data_1 = pd.DataFrame(columns=head_1) + head_data = pd.DataFrame(columns=head_1 + head_2) + head_data.to_csv(data_path, mode='a+', index=False) + section_data_size = round(data_num / 100) + iteration = round(data_num / section_data_size) + for batch in range(iteration + 1): + progress.set_time_percent(batch) + if section_data_size * (batch + 1) <= data_num: + df_data_1["id"] = id_encryption(encryption_type, section_data_size * batch + start_num, + section_data_size * (batch + 1) + start_num) + slicing_data_size = section_data_size + elif section_data_size * batch < data_num: + df_data_1 = pd.DataFrame(columns=head_1) + df_data_1["id"] = id_encryption(encryption_type, section_data_size * batch + start_num, end_num) + slicing_data_size = data_num - section_data_size * batch + else: + break + if label_flag: + df_data_1["y"] = [round(np.random.random()) for x in range(slicing_data_size)] + feature = np.random.randint(-10000, 10000, size=[slicing_data_size, feature_nums]) / 10000 + df_data_2 = pd.DataFrame(feature, columns=head_2) + output_data = pd.concat([df_data_1, df_data_2], axis=1) + output_data.to_csv(data_path, mode='a+', index=False, header=False) + + def _generate_tag_data(data_path, start_num, end_num, feature_nums, sparsity, progress): + data_num = end_num - start_num + section_data_size = round(data_num / 100) + iteration = round(data_num / section_data_size) + valid_set = [x for x in range(2019120799, 2019120799 + round(feature_nums / sparsity))] + data = list(map(str, valid_set)) + for batch in range(iteration + 1): + progress.set_time_percent(batch) + output_data = pd.DataFrame(columns=["id"]) + if section_data_size * (batch + 1) <= data_num: + output_data["id"] = id_encryption(encryption_type, section_data_size * batch + start_num, + section_data_size * (batch + 1) + start_num) + slicing_data_size = section_data_size + elif section_data_size * batch < data_num: + output_data["id"] = id_encryption(encryption_type, section_data_size * batch + start_num, end_num) + slicing_data_size = data_num - section_data_size * batch + else: + break + feature = [list_tag(feature_nums, data_list=data) for i in range(slicing_data_size)] + output_data['feature'] = feature + output_data.to_csv(data_path, mode='a+', index=False, header=False) + + def data_save(data_info, table_names, namespaces, partition_list): + data_count = 0 + for idx, data_name in enumerate(data_info.keys()): + label_flag = True if 'guest' in data_info[data_name] else False + data_type = 'dense' if 'guest' in data_info[data_name] else host_data_type + if split_host and ('host' in data_info[data_name]): + host_end_num = int(np.ceil(host_data_size / len(data_info))) * (data_count + 1) if np.ceil( + host_data_size / len(data_info)) * (data_count + 1) <= host_data_size else host_data_size + host_start_num = int(np.ceil(host_data_size / len(data_info))) * data_count + data_count += 1 + else: + host_end_num = host_data_size + host_start_num = 0 + out_path = os.path.join(str(big_data_dir), data_name) + if os.path.exists(out_path) and os.path.isfile(out_path): + if force: + remove_file(out_path) + else: + echo.echo('{} Already exists'.format(out_path)) + continue + data_i = (idx + 1) / len(data_info) + downLoad = f'dataget [{"#" * int(24 * data_i)}{"-" * (24 - int(24 * data_i))}] {idx + 1}/{len(data_info)}' + start = time.time() + progress = data_progress(downLoad, start) + thread = threading.Thread(target=run, args=[progress]) + thread.start() + + try: + if 'guest' in data_info[data_name]: + _generate_dens_data(out_path, guest_start_num, guest_end_num, + guest_feature_num, label_flag, progress) + else: + # if data_type == 'tag' and not parallelize: + if data_type == 'tag': + _generate_tag_data(out_path, host_start_num, host_end_num, host_feature_num, sparsity, progress) + # elif data_type == 'tag_value' and not parallelize: + elif data_type == 'tag_value': + _generate_tag_value_data(out_path, host_start_num, host_end_num, host_feature_num, progress) + # elif data_type == 'dense' and not parallelize: + elif data_type == 'dense': + _generate_dens_data(out_path, host_start_num, host_end_num, + host_feature_num, label_flag, progress) + + progress.set_switch(False) + time.sleep(1) + except Exception: + exception_id = uuid.uuid1() + echo.echo(f"exception_id={exception_id}") + LOGGER.exception(f"exception id: {exception_id}") + finally: + progress.set_switch(False) + echo.stdout_newline() + + def run(p): + while p.get_switch(): + time.sleep(1) + p.progress(p.get_time_percent()) + + if not match_rate > 0 or not match_rate <= 1: + raise Exception(f"The value is between (0-1), Please check match_rate:{match_rate}") + guest_start_num = host_data_size - int(guest_data_size * match_rate) + guest_end_num = guest_start_num + guest_data_size + + if os.path.isfile(include_path): + with include_path.open("r") as f: + testsuite_config = yaml.safe_load(f) + else: + raise Exception(f'Input file error, please check{include_path}.') + try: + if output_path is not None: + big_data_dir = os.path.abspath(output_path) + else: + big_data_dir = os.path.abspath(conf.cache_directory) + except Exception: + raise Exception('{}path does not exist'.format(big_data_dir)) + date_set = {} + table_name_list = [] + table_namespace_list = [] + partition_list = [] + for upload_dict in testsuite_config.get('data'): + date_set[os.path.basename(upload_dict.get('file'))] = upload_dict.get('role') + table_name_list.append(upload_dict.get('table_name')) + table_namespace_list.append(upload_dict.get('namespace')) + partition_list.append(upload_dict.get('partitions', 8)) + + data_save( + data_info=date_set, + table_names=table_name_list, + namespaces=table_namespace_list, + partition_list=partition_list) + echo.echo(f'Data storage address, please check{big_data_dir}') diff --git a/python/fate_test/scripts/performance_cli.py b/python/fate_test/scripts/performance_cli.py new file mode 100644 index 0000000..dbd981a --- /dev/null +++ b/python/fate_test/scripts/performance_cli.py @@ -0,0 +1,255 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import glob +import os +import time +import uuid +from datetime import timedelta +from inspect import signature + +import click +from fate_test._client import Clients +from fate_test._config import Config +from fate_test._io import LOGGER, echo +from fate_test._parser import PerformanceSuite +from fate_test.scripts._options import SharedOptions +from fate_test.scripts._utils import _load_testsuites, _upload_data, _delete_data, _load_module_from_script +from fate_test.utils import TxtStyle, parse_job_time_info, pretty_time_info_summary +from prettytable import PrettyTable, ORGMODE +from ruamel import yaml + + +@click.command("performance") +@click.option('-t', '--job-type', type=click.Choice(['intersect', 'intersect_multi', 'hetero_lr', 'hetero_sbt']), + help="Select the job type, you can also set through include") +@click.option('-i', '--include', type=click.Path(exists=True), multiple=True, metavar="", + help="include *performance.yaml under these paths") +@click.option('-m', '--timeout', type=int, + help="maximum running time of job") +@click.option('-e', '--epochs', type=int, help="When the algorithm model is LR, the number of iterations is set") +@click.option('-d', '--max-depth', type=int, + help="When the algorithm model is SecureBoost, set the number of model layers") +@click.option('-nt', '--num-trees', type=int, help="When the algorithm model is SecureBoost, set the number of trees") +@click.option('-p', '--task-cores', type=int, help="processors per node") +@click.option('-s', '--storage-tag', type=str, + help="tag for storing performance time consuming, for future comparison") +@click.option('-v', '--history-tag', type=str, multiple=True, + help="Extract performance time consuming from history tags for comparison") +@click.option("--skip-data", is_flag=True, default=False, + help="skip uploading data specified in testsuite") +@click.option("--provider", type=str, + help="Select the fate version, for example: fate@1.7") +@click.option("--disable-clean-data", "clean_data", flag_value=False, default=None) +@SharedOptions.get_shared_options(hidden=True) +@click.pass_context +def run_task(ctx, job_type, include, timeout, epochs, + max_depth, num_trees, task_cores, storage_tag, history_tag, skip_data, clean_data, provider, **kwargs): + """ + Test the performance of big data tasks, alias: bp + """ + ctx.obj.update(**kwargs) + ctx.obj.post_process() + config_inst = ctx.obj["config"] + if ctx.obj["extend_sid"] is not None: + config_inst.extend_sid = ctx.obj["extend_sid"] + if task_cores is not None: + config_inst.update_conf(task_cores=task_cores) + if timeout is not None: + config_inst.update_conf(timeout=timeout) + """if ctx.obj["auto_increasing_sid"] is not None: + config_inst.auto_increasing_sid = ctx.obj["auto_increasing_sid"]""" + namespace = ctx.obj["namespace"] + yes = ctx.obj["yes"] + data_namespace_mangling = ctx.obj["namespace_mangling"] + if clean_data is None: + clean_data = config_inst.clean_data + + def get_perf_template(conf: Config, job_type): + perf_dir = os.path.join(os.path.abspath(conf.perf_template_dir) + '/' + job_type + '/' + "*performance.yaml") + return glob.glob(perf_dir) + + if not include: + include = get_perf_template(config_inst, job_type) + # prepare output dir and json hooks + # _add_replace_hook(replace) + + echo.welcome() + echo.echo(f"testsuite namespace: {namespace}", fg='red') + echo.echo("loading testsuites:") + suites = _load_testsuites(includes=include, excludes=tuple(), glob=None, provider=provider, + suffix="performance.yaml", suite_type="performance") + for i, suite in enumerate(suites): + echo.echo(f"\tdataset({len(suite.dataset)}) pipeline jobs({len(suite.pipeline_jobs)}) {suite.path}") + + if not yes and not click.confirm("running?"): + return + + echo.stdout_newline() + client = Clients(config_inst) + + for i, suite in enumerate(suites): + # noinspection PyBroadException + try: + start = time.time() + echo.echo(f"[{i + 1}/{len(suites)}]start at {time.strftime('%Y-%m-%d %X')} {suite.path}", fg='red') + + if not skip_data: + try: + _upload_data(client, suite, config_inst) + except Exception as e: + raise RuntimeError(f"exception occur while uploading data for {suite.path}") from e + + echo.stdout_newline() + try: + job_time_info = _run_performance_jobs(config_inst, suite, namespace, data_namespace_mangling, + client, + epochs, max_depth, num_trees) + except Exception as e: + raise RuntimeError(f"exception occur while running pipeline jobs for {suite.path}") from e + + echo.echo(f"[{i + 1}/{len(suites)}]elapse {timedelta(seconds=int(time.time() - start))}", fg='red') + if not skip_data and clean_data: + _delete_data(client, suite) + # echo.echo(suite.pretty_final_summary(job_time_info), fg='red') + all_summary = [] + compare_summary = [] + for job_name, job_time in job_time_info.items(): + performance_dir = "/".join( + [os.path.join(os.path.abspath(config_inst.cache_directory), + 'benchmark_history', "performance.yaml")]) + fate_version = client["guest_0"].get_version() + # fate_version = "beta-2.0.0" + if history_tag: + history_tag = ["_".join([i, job_name]) for i in history_tag] + history_compare_result = comparison_quality(job_name, + history_tag, + performance_dir, + job_time["time_summary"]) + compare_summary.append(history_compare_result) + if storage_tag: + storage_tag = "_".join(['FATE', fate_version, storage_tag, job_name]) + save_quality(storage_tag, performance_dir, job_time["time_summary"]) + res_str = pretty_time_info_summary(job_time, job_name) + all_summary.append(res_str) + echo.echo("\n".join(all_summary)) + echo.echo("#" * 60) + echo.echo("\n".join(compare_summary)) + + except Exception: + exception_id = uuid.uuid1() + echo.echo(f"exception in {suite.path}, exception_id={exception_id}") + LOGGER.exception(f"exception id: {exception_id}") + finally: + echo.stdout_newline() + + echo.farewell() + echo.echo(f"testsuite namespace: {namespace}", fg='red') + + +@LOGGER.catch +def _run_performance_jobs(config: Config, suite: PerformanceSuite, namespace: str, + data_namespace_mangling: bool, client, epochs, max_depth, num_trees): + # pipeline demo goes here + job_n = len(suite.pipeline_jobs) + fate_base = config.fate_base + PYTHONPATH = os.environ.get('PYTHONPATH') + ":" + os.path.join(fate_base, "python") + os.environ['PYTHONPATH'] = PYTHONPATH + job_time_history = {} + for j, job in enumerate(suite.pipeline_jobs): + try: + echo.echo(f"Running [{j + 1}/{job_n}] job: {job.job_name}") + job_name, script_path, conf_path = job.job_name, job.script_path, job.conf_path + param = Config.load_from_file(conf_path) + if epochs is not None: + param['epochs'] = epochs + if max_depth is not None: + param['max_depth'] = max_depth + if num_trees is not None: + param['num_trees'] = num_trees + + mod = _load_module_from_script(script_path) + input_params = signature(mod.main).parameters + # local script + if len(input_params) == 1: + job_id = mod.main(param=param) + elif len(input_params) == 2: + job_id = mod.main(config=config, param=param) + # pipeline script + elif len(input_params) == 3: + if data_namespace_mangling: + job_id = mod.main(config=config, param=param, namespace=f"_{namespace}") + else: + job_id = mod.main(config=config, param=param) + else: + job_id = mod.main() + echo.echo(f"[{j + 1}/{job_n}] job: {job.job_name} Success!\n") + ret_msg = client["guest_0"].query_job(job_id=job_id, + role="guest", + party_id=config.parties.guest[0]).get("data") + time_summary = parse_job_time_info(ret_msg) + job_time_history[job_name] = {"job_id": job_id, "time_summary": time_summary} + echo.echo(f"[{j + 1}/{job_n}] job: {job.job_name} time info: {time_summary}\n") + + except Exception as e: + exception_id = uuid.uuid1() + echo.echo(f"exception while running [{j + 1}/{job_n}] job, exception_id={exception_id}", err=True, + fg='red') + LOGGER.exception(f"exception id: {exception_id}, error message: \n{e}") + continue + return job_time_history + + +def comparison_quality(group_name, history_tags, history_info_dir, time_consuming): + assert os.path.exists(history_info_dir), f"Please check the {history_info_dir} Is it deleted" + with open(history_info_dir, 'r') as f: + benchmark_quality = yaml.safe_load(f) + benchmark_performance = {} + table = PrettyTable() + table.set_style(ORGMODE) + table.field_names = ["Script Model Name", "component", "time consuming"] + for history_tag in history_tags: + for tag in benchmark_quality: + if '_'.join(tag.split("_")[2:]) == history_tag: + benchmark_performance[tag] = benchmark_quality[tag] + if benchmark_performance is not None: + benchmark_performance[group_name] = time_consuming + + for script_model_name in benchmark_performance: + time_history = benchmark_performance[script_model_name] + for cpn in time_history.get("cpn_list"): + table.add_row([f"{script_model_name}"] + + [f"{cpn}"] + + [f"{TxtStyle.FIELD_VAL}{timedelta(seconds=time_history.get(cpn))}{TxtStyle.END}"]) + # print("\n") + # print(table.get_string(title=f"{TxtStyle.TITLE}Performance comparison results{TxtStyle.END}")) + # print("#" * 60) + return table.get_string(title=f"{TxtStyle.TITLE}Performance comparison results{TxtStyle.END}") + + +def save_quality(storage_tag, save_dir, time_consuming): + os.makedirs(os.path.dirname(save_dir), exist_ok=True) + if os.path.exists(save_dir): + with open(save_dir, 'r') as f: + benchmark_quality = yaml.safe_load(f) + else: + benchmark_quality = {} + benchmark_quality.update({storage_tag: time_consuming}) + try: + with open(save_dir, 'w') as fp: + yaml.dump(benchmark_quality, fp) + print("\n" + "Storage successful, please check: ", save_dir) + except Exception: + print("\n" + "Storage failed, please check: ", save_dir) diff --git a/python/fate_test/scripts/testsuite_cli.py b/python/fate_test/scripts/testsuite_cli.py new file mode 100644 index 0000000..c1d3ffd --- /dev/null +++ b/python/fate_test/scripts/testsuite_cli.py @@ -0,0 +1,174 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import time +import uuid +from datetime import timedelta + +import click +from fate_test import _config +from fate_test._client import Clients +from fate_test._config import Config +from fate_test._io import LOGGER, echo +from fate_test._parser import Testsuite, non_success_summary +from fate_test.scripts._options import SharedOptions +from fate_test.scripts._utils import _load_testsuites, _upload_data, _delete_data, _load_module_from_script + +""" +@click.option('-uj', '--update-job-parameters', default="{}", type=JSON_STRING, + help="a json string represents mapping for replacing fields in conf.job_parameters") +@click.option('-uc', '--update-component-parameters', default="{}", type=JSON_STRING, + help="a json string represents mapping for replacing fields in conf.component_parameters") +@click.option('-m', '--timeout', type=int, default=3600, help="maximun running time of job") +@click.option('-p', '--task-cores', type=int, help="processors per node") +""" + + +@click.command("suite") +@click.option('-i', '--include', required=True, type=click.Path(exists=True), multiple=True, metavar="", + help="include *testsuite.yaml under these paths") +@click.option('-e', '--exclude', type=click.Path(exists=True), multiple=True, + help="exclude *testsuite.yaml under these paths") +@click.option('-p', '--task-cores', type=int, help="processors per node") +@click.option('-m', '--timeout', type=int, + help="maximum running time of job") +@click.option("-g", '--glob', type=str, + help="glob string to filter sub-directory of path specified by ") +@click.option("--skip-jobs", is_flag=True, default=False, + help="skip pipeline jobs defined in testsuite") +@click.option("--skip-data", is_flag=True, default=False, + help="skip uploading data specified in testsuite") +@click.option("--data-only", is_flag=True, default=False, + help="upload data only") +@click.option("--provider", type=str, + help="Select the fate version, for example: fate@2.0-beta") +@click.option("--disable-clean-data", "clean_data", flag_value=False, default=None) +@click.option("--enable-clean-data", "clean_data", flag_value=True, default=None) +@SharedOptions.get_shared_options(hidden=True) +@click.pass_context +def run_suite(ctx, include, exclude, glob, + skip_jobs, skip_data, data_only, clean_data, provider, task_cores, timeout, **kwargs): + """ + process testsuite + """ + ctx.obj.update(**kwargs) + ctx.obj.post_process() + config_inst = ctx.obj["config"] + if ctx.obj["extend_sid"] is not None: + config_inst.extend_sid = ctx.obj["extend_sid"] + if task_cores is not None: + config_inst.update_conf(task_cores=task_cores) + if timeout is not None: + config_inst.update_conf(timeout=timeout) + + """if ctx.obj["auto_increasing_sid"] is not None: + config_inst.auto_increasing_sid = ctx.obj["auto_increasing_sid"]""" + if clean_data is None: + clean_data = config_inst.clean_data + namespace = ctx.obj["namespace"] + yes = ctx.obj["yes"] + data_namespace_mangling = ctx.obj["namespace_mangling"] + # prepare output dir and json hooks + # _add_replace_hook(replace) + echo.welcome() + echo.echo(f"testsuite namespace: {namespace}", fg='red') + echo.echo("loading testsuites:") + suites = _load_testsuites(includes=include, excludes=exclude, glob=glob, provider=provider) + for suite in suites: + _config.jobs_num += len(suite.pipeline_jobs) + echo.echo(f"\tdataset({len(suite.dataset)}) " + f"pipeline jobs ({len(suite.pipeline_jobs)}) {suite.path}") + if not yes and not click.confirm("running?"): + return + + echo.stdout_newline() + # with Clients(config_inst) as client: + client = Clients(config_inst) + + for i, suite in enumerate(suites): + # noinspection PyBroadException + try: + start = time.time() + echo.echo(f"[{i + 1}/{len(suites)}]start at {time.strftime('%Y-%m-%d %X')} {suite.path}", fg='red') + if not skip_data: + try: + _upload_data(client, suite, config_inst) + except Exception as e: + raise RuntimeError(f"exception occur while uploading data for {suite.path}") from e + if data_only: + continue + + if not skip_jobs: + try: + time_consuming = _run_pipeline_jobs(config_inst, suite, namespace, data_namespace_mangling) + except Exception as e: + raise RuntimeError(f"exception occur while running pipeline jobs for {suite.path}") from e + + if not skip_data and clean_data: + _delete_data(client, suite) + echo.echo(f"[{i + 1}/{len(suites)}]elapse {timedelta(seconds=int(time.time() - start))}", fg='red') + if not skip_jobs: + suite_file = str(suite.path).split("/")[-1] + echo.echo(suite.pretty_final_summary(time_consuming, suite_file)) + + except Exception: + exception_id = uuid.uuid1() + echo.echo(f"exception in {suite.path}, exception_id={exception_id}") + LOGGER.exception(f"exception id: {exception_id}") + finally: + echo.stdout_newline() + non_success_summary() + echo.farewell() + echo.echo(f"testsuite namespace: {namespace}", fg='red') + + +def _run_pipeline_jobs(config: Config, suite: Testsuite, namespace: str, data_namespace_mangling: bool): + # pipeline demo goes here + job_n = len(suite.pipeline_jobs) + time_list = [] + for i, pipeline_job in enumerate(suite.pipeline_jobs): + echo.echo(f"Running [{i + 1}/{job_n}] job: {pipeline_job.job_name}") + + def _raise(err_msg, status="failed"): + exception_id = str(uuid.uuid1()) + suite.update_status(job_name=job_name, exception_id=exception_id, status=status) + echo.file(f"exception({exception_id}), error message:\n{err_msg}") + + job_name, script_path = pipeline_job.job_name, pipeline_job.script_path + mod = _load_module_from_script(script_path) + start = time.time() + try: + if data_namespace_mangling: + try: + mod.main(config=config, namespace=f"_{namespace}") + suite.update_status(job_name=job_name, status="success") + time_list.append(time.time() - start) + + except Exception as e: + _raise(e) + continue + else: + try: + mod.main(config=config) + suite.update_status(job_name=job_name, status="success") + time_list.append(time.time() - start) + except Exception as e: + _raise(e) + continue + except Exception as e: + _raise(e, status="not submitted") + continue + + return [str(int(i)) + "s" for i in time_list] diff --git a/python/fate_test/utils.py b/python/fate_test/utils.py new file mode 100644 index 0000000..2176109 --- /dev/null +++ b/python/fate_test/utils.py @@ -0,0 +1,404 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import math +import os +from datetime import timedelta + +import numpy as np +from colorama import init, deinit, Fore, Style +from fate_test._io import echo +from prettytable import PrettyTable, ORGMODE +from ruamel import yaml + +SCRIPT_METRICS = "script_metrics" +DISTRIBUTION_METRICS = "distribution_metrics" +ALL = "all" +RELATIVE = "relative" +ABSOLUTE = "absolute" + + +class TxtStyle: + TRUE_VAL = Fore.GREEN + FALSE_VAL = Fore.RED + Style.BRIGHT + TITLE = Fore.BLUE + FIELD_VAL = Fore.YELLOW + DATA_FIELD_VAL = Fore.CYAN + END = Style.RESET_ALL + + +def show_data(data): + data_table = PrettyTable() + data_table.set_style(ORGMODE) + data_table.field_names = ["Data", "Information"] + for name, table_name in data.items(): + row = [name, f"{TxtStyle.DATA_FIELD_VAL}{table_name}{TxtStyle.END}"] + data_table.add_row(row) + echo.echo(data_table.get_string(title=f"{TxtStyle.TITLE}Data Summary{TxtStyle.END}")) + echo.echo("\n") + + +def _get_common_metrics(**results): + common_metrics = None + for result in results.values(): + if common_metrics is None: + common_metrics = set(result.keys()) + else: + common_metrics = common_metrics & result.keys() + if SCRIPT_METRICS in common_metrics: + common_metrics.remove(SCRIPT_METRICS) + return list(common_metrics) + + +def _filter_results(metrics, **results): + filtered_results = {} + for model_name, result in results.items(): + model_result = [result.get(metric, None) for metric in metrics] + if None in model_result: + continue + filtered_results[model_name] = model_result + return filtered_results + + +def style_table(txt): + colored_txt = txt.replace("True", f"{TxtStyle.TRUE_VAL}True{TxtStyle.END}") + colored_txt = colored_txt.replace("False", f"{TxtStyle.FALSE_VAL}False{TxtStyle.END}") + return colored_txt + + +def evaluate_almost_equal(metrics, results, abs_tol=None, rel_tol=None): + """ + Evaluate for each given metric if values in results are almost equal + Parameters + ---------- + metrics: List[str], metrics names + results: dict, results to be evaluated + abs_tol: float, absolute error tolerance + rel_tol: float, relative difference tolerance + Returns + ------- + bool, return True if all metrics in results are almost equal + """ + # return False if empty + if len(metrics) == 0: + return False + eval_summary = {} + for i, metric in enumerate(metrics): + v_eval = [res[i] for res in results.values()] + first_v = v_eval[0] + if metric == SCRIPT_METRICS: + continue + if abs_tol is not None and rel_tol is not None: + eval_summary[metric] = all(math.isclose(v, first_v, abs_tol=abs_tol, rel_tol=rel_tol) for v in v_eval) + elif abs_tol is not None: + eval_summary[metric] = all(math.isclose(v, first_v, abs_tol=abs_tol) for v in v_eval) + elif rel_tol is not None: + eval_summary[metric] = all(math.isclose(v, first_v, rel_tol=rel_tol) for v in v_eval) + else: + eval_summary[metric] = all(math.isclose(v, first_v) for v in v_eval) + all_match = all(eval_summary.values()) + return eval_summary, all_match + + +def _distribution_metrics(**results): + filtered_metric_group = _filter_results([DISTRIBUTION_METRICS], **results) + for script, model_results_pair in filtered_metric_group.items(): + metric_results = model_results_pair[0] + common_metrics = _get_common_metrics(**metric_results) + filtered_results = _filter_results(common_metrics, **metric_results) + table = PrettyTable() + table.set_style(ORGMODE) + script_model_names = list(filtered_results.keys()) + table.field_names = ["Script Model Name"] + common_metrics + for script_model_name in script_model_names: + row = [f"{script}-{script_model_name}"] + [f"{TxtStyle.FIELD_VAL}{v}{TxtStyle.END}" for v in + filtered_results[script_model_name]] + table.add_row(row) + echo.echo(table.get_string(title=f"{TxtStyle.TITLE}{script} distribution metrics{TxtStyle.END}")) + echo.echo("\n" + "#" * 60) + + +def match_script_metrics(abs_tol, rel_tol, match_details, **results): + filtered_metric_group = _filter_results([SCRIPT_METRICS], **results) + for script, model_results_pair in filtered_metric_group.items(): + metric_results = model_results_pair[0] + common_metrics = _get_common_metrics(**metric_results) + filtered_results = _filter_results(common_metrics, **metric_results) + table = PrettyTable() + table.set_style(ORGMODE) + script_model_names = list(filtered_results.keys()) + table.field_names = ["Script Model Name"] + common_metrics + for script_model_name in script_model_names: + row = [f"{script_model_name}-{script}"] + [f"{TxtStyle.FIELD_VAL}{v}{TxtStyle.END}" for v in + filtered_results[script_model_name]] + table.add_row(row) + echo.echo(table.get_string(title=f"{TxtStyle.TITLE}{script} Script Metrics Summary{TxtStyle.END}")) + _all_match(common_metrics, filtered_results, abs_tol, rel_tol, script, match_details=match_details) + + +def match_metrics(evaluate, group_name, abs_tol=None, rel_tol=None, storage_tag=None, history_tag=None, + fate_version=None, cache_directory=None, match_details=None, **results): + """ + Get metrics + Parameters + ---------- + evaluate: bool, whether to evaluate metrics are almost equal, and include compare results in output report + group_name: str, group name of all models + abs_tol: float, max tolerance of absolute error to consider two metrics to be almost equal + rel_tol: float, max tolerance of relative difference to consider two metrics to be almost equal + storage_tag: str, metrics information storage tag + history_tag: str, historical metrics information comparison tag + fate_version: str, FATE version + cache_directory: str, Storage path of metrics information + match_details: str, Error value display in algorithm comparison + results: dict of model name: metrics + Returns + ------- + match result + """ + init(autoreset=True) + common_metrics = _get_common_metrics(**results) + filtered_results = _filter_results(common_metrics, **results) + table = PrettyTable() + table.set_style(ORGMODE) + model_names = list(filtered_results.keys()) + table.field_names = ["Model Name"] + common_metrics + for model_name in model_names: + row = [f"{model_name}-{group_name}"] + [f"{TxtStyle.FIELD_VAL}{v}{TxtStyle.END}" for v in + filtered_results[model_name]] + table.add_row(row) + echo.echo(table.get_string(title=f"{TxtStyle.TITLE}Metrics Summary{TxtStyle.END}")) + + if evaluate and len(filtered_results.keys()) > 1: + _all_match(common_metrics, filtered_results, abs_tol, rel_tol, match_details=match_details) + + _distribution_metrics(**results) + match_script_metrics(abs_tol, rel_tol, match_details, **results) + if history_tag: + history_tag = ["_".join([i, group_name]) for i in history_tag] + comparison_quality(group_name, history_tag, cache_directory, abs_tol, rel_tol, match_details, **results) + if storage_tag: + storage_tag = "_".join(['FATE', fate_version, storage_tag, group_name]) + _save_quality(storage_tag, cache_directory, **results) + deinit() + + +def _match_error(metrics, results): + relative_error_list = [] + absolute_error_list = [] + if len(metrics) == 0: + return False + for i, v in enumerate(metrics): + v_eval = [res[i] for res in results.values()] + absolute_error_list.append(f"{TxtStyle.FIELD_VAL}{abs(max(v_eval) - min(v_eval))}{TxtStyle.END}") + relative_error_list.append( + f"{TxtStyle.FIELD_VAL}{abs((max(v_eval) - min(v_eval)) / max(v_eval))}{TxtStyle.END}") + return relative_error_list, absolute_error_list + + +def _all_match(common_metrics, filtered_results, abs_tol, rel_tol, script=None, match_details=None): + eval_summary, all_match = evaluate_almost_equal(common_metrics, filtered_results, abs_tol, rel_tol) + eval_table = PrettyTable() + eval_table.set_style(ORGMODE) + field_names = ["Metric", "All Match"] + relative_error_list, absolute_error_list = _match_error(common_metrics, filtered_results) + for i, metric in enumerate(eval_summary.keys()): + row = [metric, eval_summary.get(metric)] + if match_details == ALL: + field_names = ["Metric", "All Match", "max_relative_error", "max_absolute_error"] + row += [relative_error_list[i], absolute_error_list[i]] + elif match_details == RELATIVE: + field_names = ["Metric", "All Match", "max_relative_error"] + row += [relative_error_list[i]] + elif match_details == ABSOLUTE: + field_names = ["Metric", "All Match", "max_absolute_error"] + row += [absolute_error_list[i]] + eval_table.add_row(row) + eval_table.field_names = field_names + + echo.echo(style_table(eval_table.get_string(title=f"{TxtStyle.TITLE}Match Results{TxtStyle.END}"))) + script = "" if script is None else f"{script} " + if all_match: + echo.echo(f"All {script}Metrics Match: {TxtStyle.TRUE_VAL}{all_match}{TxtStyle.END}") + else: + echo.echo(f"All {script}Metrics Match: {TxtStyle.FALSE_VAL}{all_match}{TxtStyle.END}") + + +def comparison_quality(group_name, history_tags, cache_directory, abs_tol, rel_tol, match_details, **results): + def regression_group(results_dict): + metric = {} + for k, v in results_dict.items(): + if not isinstance(v, dict): + metric[k] = v + return metric + + def class_group(class_dict): + metric = {} + for k, v in class_dict.items(): + if not isinstance(v, dict): + metric[k] = v + for k, v in class_dict['distribution_metrics'].items(): + metric.update(v) + return metric + + history_info_dir = "/".join([os.path.join(os.path.abspath(cache_directory), 'benchmark_history', + "benchmark_quality.yaml")]) + assert os.path.exists(history_info_dir), f"Please check the {history_info_dir} Is it deleted" + with open(history_info_dir, 'r') as f: + benchmark_quality = yaml.safe_load(f) + regression_metric = {} + regression_quality = {} + class_quality = {} + for history_tag in history_tags: + for tag in benchmark_quality: + if '_'.join(tag.split("_")[2:]) == history_tag and SCRIPT_METRICS in results["FATE"]: + regression_metric[tag] = regression_group(benchmark_quality[tag]['FATE']) + for key, value in _filter_results([SCRIPT_METRICS], **benchmark_quality[tag])['FATE'][0].items(): + regression_quality["_".join([tag, key])] = value + elif '_'.join(tag.split("_")[2:]) == history_tag and DISTRIBUTION_METRICS in results["FATE"]: + class_quality[tag] = class_group(benchmark_quality[tag]['FATE']) + + if SCRIPT_METRICS in results["FATE"] and regression_metric: + regression_metric[group_name] = regression_group(results['FATE']) + metric_compare(abs_tol, rel_tol, match_details, **regression_metric) + for key, value in _filter_results([SCRIPT_METRICS], **results)['FATE'][0].items(): + regression_quality["_".join([group_name, key])] = value + metric_compare(abs_tol, rel_tol, match_details, **regression_quality) + echo.echo("\n" + "#" * 60) + elif DISTRIBUTION_METRICS in results["FATE"] and class_quality: + + class_quality[group_name] = class_group(results['FATE']) + metric_compare(abs_tol, rel_tol, match_details, **class_quality) + echo.echo("\n" + "#" * 60) + + +def metric_compare(abs_tol, rel_tol, match_details, **metric_results): + common_metrics = _get_common_metrics(**metric_results) + filtered_results = _filter_results(common_metrics, **metric_results) + table = PrettyTable() + table.set_style(ORGMODE) + script_model_names = list(filtered_results.keys()) + table.field_names = ["Script Model Name"] + common_metrics + for script_model_name in script_model_names: + table.add_row([f"{script_model_name}"] + + [f"{TxtStyle.FIELD_VAL}{v}{TxtStyle.END}" for v in filtered_results[script_model_name]]) + print( + table.get_string(title=f"{TxtStyle.TITLE}Comparison results of all metrics of Script Model FATE{TxtStyle.END}")) + _all_match(common_metrics, filtered_results, abs_tol, rel_tol, match_details=match_details) + + +def _save_quality(storage_tag, cache_directory, **results): + save_dir = "/".join([os.path.join(os.path.abspath(cache_directory), 'benchmark_history', "benchmark_quality.yaml")]) + os.makedirs(os.path.dirname(save_dir), exist_ok=True) + if os.path.exists(save_dir): + with open(save_dir, 'r') as f: + benchmark_quality = yaml.safe_load(f) + else: + benchmark_quality = {} + if storage_tag in benchmark_quality: + print("This tag already exists in the history and will be updated to the record information.") + benchmark_quality.update({storage_tag: results}) + try: + with open(save_dir, 'w') as fp: + yaml.dump(benchmark_quality, fp) + print("Storage success, please check: ", save_dir) + except Exception: + print("Storage failed, please check: ", save_dir) + + +def parse_summary_result(rs_dict): + for model_key in rs_dict: + rs_content = rs_dict[model_key] + if 'test_set' in rs_content: + metric_result = rs_content['test_set'] + if 'validate_set' in rs_content: + metric_result = rs_content['validate_set'] + else: + metric_result = rs_content['train_set'] + return extract_and_flatten_summary_metric(metric_result) + + +def extract_and_flatten_summary_metric(metric_dict_list): + flatten_metric_summary = {} + for metric_group in metric_dict_list: + if isinstance(metric_group, dict): + metric_name = metric_group['metric'] + metric_val = metric_group['val'] + if isinstance(metric_val, float) or isinstance(metric_val, int): + flatten_metric_summary[metric_name] = metric_val + elif isinstance(metric_group, list): + for metric_subset in metric_group: + metric_name = metric_subset['metric'] + metric_val = metric_subset['val'] + if isinstance(metric_val, float) or isinstance(metric_val, int): + flatten_metric_summary[metric_name] = metric_val + else: + raise ValueError(f"Invalid metric group: {metric_group}") + return flatten_metric_summary + + +def extract_data(df, col_name, convert_float=True, keep_id=False): + """ + component output data to numpy array + Parameters + ---------- + df: dataframe + col_name: column to extract + convert_float: whether to convert extracted value to float value + keep_id: whether to keep id + Returns + ------- + array of extracted data, optionally with id + """ + if keep_id: + if convert_float: + df[col_name] = df[col_name].to_numpy().astype(np.float64) + + return df[[df.columns[0], col_name]].to_numpy() + else: + return df[col_name].to_numpy().astype(np.float64) + + +def parse_job_time_info(job_time_info): + time_info_summary = {} + cpn_list = [] + for cpn in job_time_info: + cpn_name = cpn.get("task_name") + # convert milliseconds to seconds + cpn_elapsed = round(cpn.get("elapsed") / 1000) + time_info_summary[cpn_name] = cpn_elapsed + cpn_list.append(cpn_name) + time_info_summary["cpn_list"] = cpn_list + return time_info_summary + + +def pretty_time_info_summary(time_info_summary, job_name): + table = PrettyTable() + table.set_style(ORGMODE) + field_names = ["component name", "time consuming"] + table.field_names = field_names + time_summary = time_info_summary.get("time_summary", {}) + for cpn_name in time_summary["cpn_list"]: + cpn_elapse = time_summary.get(cpn_name) + table.add_row( + [ + f"{cpn_name}", + f"{TxtStyle.FIELD_VAL}{timedelta(seconds=cpn_elapse)}{TxtStyle.END}", + ] + ) + + return table.get_string(title=f"{TxtStyle.TITLE}Component Time Summary: " + f"{job_name}({time_info_summary['job_id']}){TxtStyle.END}") diff --git a/python/requirements.txt b/python/requirements.txt new file mode 100644 index 0000000..e76ea64 --- /dev/null +++ b/python/requirements.txt @@ -0,0 +1,8 @@ +colorama>0.4.4 +prettytable<2.0.0,>=1.0.0 +loguru>=0.6.0 +pandas==2.0.3 +ruamel-yaml==0.16 +click>=8.0.0 +xgboost==1.7.6 +scikit-learn diff --git a/python/setup.py b/python/setup.py new file mode 100644 index 0000000..c5cc89d --- /dev/null +++ b/python/setup.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +from setuptools import setup + +packages = ["fate_test", "fate_test.scripts"] + +package_data = {"": ["*"]} + +install_requires = [ + "click>=8.0.0", + "loguru>=0.6.0", + "pandas>=2.0.3", + "prettytable>=1.0.0,<2.0.0", + "ruamel.yaml>=0.16,<0.17.0", + 'colorama>0.4.4', + 'xgboost>=1.7.6', + "scikit-learn" +] + +entry_points = {"console_scripts": ["fate_test = fate_test.scripts.cli:cli"]} + +setup_kwargs = { + "name": "fate-test", + "version": "2.0.0-beta", + "description": "test tools for FATE", + "long_description": 'FATE Test\n=========\n\nA collection of useful tools to running FATE\'s test.\n\n.. image:: images/tutorial.gif\n :align: center\n :alt: tutorial\n\nquick start\n-----------\n\n1. (optional) create virtual env\n\n .. code-block:: bash\n\n python -m venv venv\n source venv/bin/activate\n pip install -U pip\n\n\n2. install fate_test\n\n .. code-block:: bash\n\n pip install fate_test\n fate_test --help\n\n\n3. edit default fate_test_config.yaml\n\n .. code-block:: bash\n\n # edit priority config file with system default editor\n # filling some field according to comments\n fate_test config edit\n\n4. configure FATE-Pipeline and FATE-Flow Commandline server setting\n\n.. code-block:: bash\n\n # configure FATE-Pipeline server setting\n pipeline init --port 9380 --ip 127.0.0.1\n # configure FATE-Flow Commandline server setting\n flow init --port 9380 --ip 127.0.0.1\n\n5. run some fate_test suite\n\n .. code-block:: bash\n\n fate_test suite -i \n\n\n6. run some fate_test benchmark\n\n .. code-block:: bash\n\n fate_test benchmark-quality -i \n\n7. useful logs or exception will be saved to logs dir with namespace shown in last step\n\ndevelop install\n---------------\nIt is more convenient to use the editable mode during development: replace step 2 with flowing steps\n\n.. code-block:: bash\n\n pip install -e ${FATE}/python/fate_client && pip install -e ${FATE}/python/fate_test\n\n\n\ncommand types\n-------------\n\n- suite: used for running testsuites, collection of FATE jobs\n\n .. code-block:: bash\n\n fate_test suite -i \n\n\n- benchmark-quality used for comparing modeling quality between FATE and other machine learning systems\n\n .. code-block:: bash\n\n fate_test benchmark-quality -i \n\n\n\nconfiguration by examples\n--------------------------\n\n1. no need ssh tunnel:\n\n - 9999, service: service_a\n - 10000, service: service_b\n\n and both service_a, service_b can be requested directly:\n\n .. code-block:: yaml\n\n work_mode: 1 # 0 for standalone, 1 for cluster\n data_base_dir: \n parties:\n guest: [10000]\n host: [9999, 10000]\n arbiter: [9999]\n services:\n - flow_services:\n - {address: service_a, parties: [9999]}\n - {address: service_b, parties: [10000]}\n\n2. need ssh tunnel:\n\n - 9999, service: service_a\n - 10000, service: service_b\n\n service_a, can be requested directly while service_b don\'t,\n but you can request service_b in other node, say B:\n\n .. code-block:: yaml\n\n work_mode: 0 # 0 for standalone, 1 for cluster\n data_base_dir: \n parties:\n guest: [10000]\n host: [9999, 10000]\n arbiter: [9999]\n services:\n - flow_services:\n - {address: service_a, parties: [9999]}\n - flow_services:\n - {address: service_b, parties: [10000]}\n ssh_tunnel: # optional\n enable: true\n ssh_address: :\n ssh_username: \n ssh_password: # optional\n ssh_priv_key: "~/.ssh/id_rsa"\n\n\nTestsuite\n---------\n\nTestsuite is used for running a collection of jobs in sequence. Data used for jobs could be uploaded before jobs are\nsubmitted, and are cleaned when jobs finished. This tool is useful for FATE\'s release test.\n\ncommand options\n~~~~~~~~~~~~~~~\n\n.. code-block:: bash\n\n fate_test suite --help\n\n1. include:\n\n .. code-block:: bash\n\n fate_test suite -i \n\n will run testsuites in *path1*\n\n2. exclude:\n\n .. code-block:: bash\n\n fate_test suite -i -e -e ...\n\n will run testsuites in *path1* but not in *path2* and *path3*\n\n3. glob:\n\n .. code-block:: bash\n\n fate_test suite -i -g "hetero*"\n\n will run testsuites in sub directory start with *hetero* of *path1*\n\n4. replace:\n\n .. code-block:: bash\n\n fate_test suite -i -r \'{"maxIter": 5}\'\n\n will find all key-value pair with key "maxIter" in `data conf` or `conf` or `dsl` and replace the value with 5\n\n\n5. skip-data:\n\n .. code-block:: bash\n\n fate_test suite -i --skip-data\n\n will run testsuites in *path1* without uploading data specified in *benchmark.json*.\n\n\n6. yes:\n\n .. code-block:: bash\n\n fate_test suite -i --yes\n\n will run testsuites in *path1* directly, skipping double check\n\n7. skip-dsl-jobs:\n\n .. code-block:: bash\n\n fate_test suite -i --skip-dsl-jobs\n\n will run testsuites in *path1* but skip all *tasks* in testsuites. It\'s would be useful when only pipeline tasks needed.\n\n8. skip-pipeline-jobs:\n\n .. code-block:: bash\n\n fate_test suite -i --skip-pipeline-jobs\n\n will run testsuites in *path1* but skip all *pipeline tasks* in testsuites. It\'s would be useful when only dsl tasks needed.\n\n\nBenchmark Quality\n------------------\n\nBenchmark-quality is used for comparing modeling quality between FATE\nand other machine learning systems. Benchmark produces a metrics comparison\nsummary for each benchmark job group.\n\n.. code-block:: bash\n\n fate_test benchmark-quality -i examples/benchmark_quality/hetero_linear_regression\n\n.. code-block:: bash\n\n +-------+--------------------------------------------------------------+\n | Data | Name |\n +-------+--------------------------------------------------------------+\n | train | {\'guest\': \'motor_hetero_guest\', \'host\': \'motor_hetero_host\'} |\n | test | {\'guest\': \'motor_hetero_guest\', \'host\': \'motor_hetero_host\'} |\n +-------+--------------------------------------------------------------+\n +------------------------------------+--------------------+--------------------+-------------------------+---------------------+\n | Model Name | explained_variance | r2_score | root_mean_squared_error | mean_squared_error |\n +------------------------------------+--------------------+--------------------+-------------------------+---------------------+\n | local-linear_regression-regression | 0.9035168452250094 | 0.9035070863155368 | 0.31340413289880553 | 0.09822215051805216 |\n | FATE-linear_regression-regression | 0.903146386539082 | 0.9031411831961411 | 0.3139977881119483 | 0.09859461093919596 |\n +------------------------------------+--------------------+--------------------+-------------------------+---------------------+\n +-------------------------+-----------+\n | Metric | All Match |\n +-------------------------+-----------+\n | explained_variance | True |\n | r2_score | True |\n | root_mean_squared_error | True |\n | mean_squared_error | True |\n +-------------------------+-----------+\n\ncommand options\n~~~~~~~~~~~~~~~\n\nuse the following command to show help message\n\n.. code-block:: bash\n\n fate_test benchmark-quality --help\n\n1. include:\n\n .. code-block:: bash\n\n fate_test benchmark-quality -i \n\n will run benchmark testsuites in *path1*\n\n2. exclude:\n\n .. code-block:: bash\n\n fate_test benchmark-quality -i -e -e ...\n\n will run benchmark testsuites in *path1* but not in *path2* and *path3*\n\n3. glob:\n\n .. code-block:: bash\n\n fate_test benchmark-quality -i -g "hetero*"\n\n will run benchmark testsuites in sub directory start with *hetero* of *path1*\n\n4. tol:\n\n .. code-block:: bash\n\n fate_test benchmark-quality -i -t 1e-3\n\n will run benchmark testsuites in *path1* with absolute tolerance of difference between metrics set to 0.001.\n If absolute difference between metrics is smaller than *tol*, then metrics are considered\n almost equal. Check benchmark testsuite `writing guide <#benchmark-testsuite>`_ on setting alternative tolerance.\n\n5. skip-data:\n\n .. code-block:: bash\n\n fate_test benchmark-quality -i --skip-data\n\n will run benchmark testsuites in *path1* without uploading data specified in *benchmark.json*.\n\n\n6. yes:\n\n .. code-block:: bash\n\n fate_test benchmark-quality -i --yes\n\n will run benchmark testsuites in *path1* directly, skipping double check\n\n\nbenchmark testsuite\n~~~~~~~~~~~~~~~~~~~\n\nConfiguration of jobs should be specified in a benchmark testsuite whose file name ends\nwith "\\*benchmark.json". For benchmark testsuite example,\nplease refer `here <../../examples/benchmark_quality>`_.\n\nA benchmark testsuite includes the following elements:\n\n- data: list of local data to be uploaded before running FATE jobs\n\n - file: path to original data file to be uploaded, should be relative to testsuite or FATE installation path\n - head: whether file includes header\n - partition: number of partition for data storage\n - table_name: table name in storage\n - namespace: table namespace in storage\n - role: which role to upload the data, as specified in fate_test.config;\n naming format is: "{role_type}_{role_index}", index starts at 0\n\n .. code-block:: json\n\n "data": [\n {\n "file": "examples/data/motor_hetero_host.csv",\n "head": 1,\n "partition": 8,\n "table_name": "motor_hetero_host",\n "namespace": "experiment",\n "role": "host_0"\n }\n ]\n\n- job group: each group includes arbitrary number of jobs with paths to corresponding script and configuration\n\n - job: name of job to be run, must be unique within each group list\n\n - script: path to `testing script <#testing-script>`_, should be relative to testsuite\n - conf: path to job configuration file for script, should be relative to testsuite\n\n .. code-block:: json\n\n "local": {\n "script": "./local-linr.py",\n "conf": "./linr_config.yaml"\n }\n\n - compare_setting: additional setting for quality metrics comparison, currently only takes ``relative_tol``\n\n If metrics *a* and *b* satisfy *abs(a-b) <= max(relative_tol \\* max(abs(a), abs(b)), absolute_tol)*\n (from `math module `_),\n they are considered almost equal. In the below example, metrics from "local" and "FATE" jobs are\n considered almost equal if their relative difference is smaller than\n *0.05 \\* max(abs(local_metric), abs(pipeline_metric)*.\n\n .. code-block:: json\n\n "linear_regression-regression": {\n "local": {\n "script": "./local-linr.py",\n "conf": "./linr_config.yaml"\n },\n "FATE": {\n "script": "./fate-linr.py",\n "conf": "./linr_config.yaml"\n },\n "compare_setting": {\n "relative_tol": 0.01\n }\n }\n\n\ntesting script\n~~~~~~~~~~~~~~\n\nAll job scripts need to have ``Main`` function as an entry point for executing jobs; scripts should\nreturn two dictionaries: first with data information key-value pairs: {data_type}: {data_name_dictionary};\nthe second contains {metric_name}: {metric_value} key-value pairs for metric comparison.\n\nBy default, the final data summary shows the output from the job named "FATE"; if no such job exists,\ndata information returned by the first job is shown. For clear presentation, we suggest that user follow\nthis general `guideline <../../examples/data/README.md#data-set-naming-rule>`_ for data set naming. In the case of multi-host\ntask, consider numbering host as such:\n\n::\n\n {\'guest\': \'default_credit_homo_guest\',\n \'host_1\': \'default_credit_homo_host_1\',\n \'host_2\': \'default_credit_homo_host_2\'}\n\nReturned quality metrics of the same key are to be compared.\nNote that only **real-value** metrics can be compared.\n\n- FATE script: ``Main`` always has three inputs:\n\n - config: job configuration, `JobConfig <../fate_client/pipeline/utils/tools.py#L64>`_ object loaded from "fate_test_config.yaml"\n - param: job parameter setting, dictionary loaded from "conf" file specified in benchmark testsuite\n - namespace: namespace suffix, user-given *namespace* or generated timestamp string when using *namespace-mangling*\n\n- non-FATE script: ``Main`` always has one input:\n\n - param: job parameter setting, dictionary loaded from "conf" file specified in benchmark testsuite\n\n\ndata\n----\n\n`Data` sub-command is used for upload or delete dataset in suite\'s.\n\ncommand options\n~~~~~~~~~~~~~~~\n\n.. code-block:: bash\n\n fate_test data --help\n\n1. include:\n\n .. code-block:: bash\n\n fate_test data [upload|delete] -i \n\n will upload/delete dataset in testsuites in *path1*\n\n2. exclude:\n\n .. code-block:: bash\n\n fate_test data [upload|delete] -i -e -e ...\n\n will upload/delete dataset in testsuites in *path1* but not in *path2* and *path3*\n\n3. glob:\n\n .. code-block:: bash\n\n fate_test data [upload|delete] -i -g "hetero*"\n\n will upload/delete dataset in testsuites in sub directory start with *hetero* of *path1*\n\n\nfull command options\n---------------------\n\n.. click:: fate_test.scripts.cli:cli\n :prog: fate_test\n :show-nested:\n', + "author": "FederatedAI", + "author_email": "contact@FedAI.org", + "maintainer": None, + "maintainer_email": None, + "url": "https://fate.fedai.org/", + "packages": packages, + "package_data": package_data, + "install_requires": install_requires, + "entry_points": entry_points, + "python_requires": ">=3.8", +} + +setup(**setup_kwargs)