diff --git a/RELEASE.md b/RELEASE.md index 67058bb6b..69484e7bb 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,3 +1,13 @@ +## Release 2.1.0 +### Major Features and Improvements +Improved the display issue of output data. +Enhanced the PyPI package: configuration files have been relocated to the user's home directory, and the relative paths for uploading data are based on the user's home directory. +Supported running FATE algorithms in Spark on YARN client mode. + +### Bug-Fix +Fixed an issue where failed tasks could not be retried. +Fixed an issue where the system couldn't run when the task cores exceeded the system total cores. + ## Release 2.0.0 ### Major Features and Improvements * Adapted to new scalable and standardized federated DSL IR diff --git a/doc/fate_access.md b/doc/fate_access.md new file mode 100644 index 000000000..669e40b51 --- /dev/null +++ b/doc/fate_access.md @@ -0,0 +1,195 @@ +# FATE 2.0 Version Interconnection Guide + +## 1. FATE Flow Integration Guide +- Description: This section provides guidance on integrating heterogeneous scheduling platforms with the FATE scheduling platform's FATE Flow. +- Scenario: This side is the system to be integrated, and the partner is the FATE site. + +### 1.1 Interfaces +![api](./images/fate_flow_api.png) +FATE Flow interfaces are divided into 4 categories: +- 1.responsible for receiving requests from upper-level systems, such as submitting, stopping, and querying jobs; +- 2.responsible for receiving requests from the scheduling layer, such as starting and stopping tasks; +- 3.responsible for receiving requests from algorithm containers, such as task status, input reporting, etc.; +- 4.responsible for receiving requests from the platform layer and distributing them to the interfaces of the participating parties. + +#### 1.1.1 api-1 +Description: Since it is about integrating with the upper-level system and does not involve interaction between schedulers, this interface is optional and can be customized without constraints. + +#### 1.1.2 api-2 +Refer to [interface](./../python/fate_flow/apps/partner/partner_app.py) implementation +- `/v2/partner/job/create`: Create a job +- `/v2/partner/job/start`: Start a job +- `/v2/partner/job/status/update`: Update job status +- `/v2/partner/job/update`: Update job (e.g., progress information) +- `/v2/partner/job/resource/apply`: Apply for job resources +- `/v2/partner/job/resource/return`: Return job resources +- `/v2/partner/job/stop`: Stop job +- `/v2/partner/task/resource/apply`: Apply for task resources +- `/v2/partner/task/resource/return`: Return task resources +- `/v2/partner/task/start`: Start task +- `/v2/partner/task/collect`: Scheduler collects task status +- `/v2/partner/task/status/update`: Update task status +- `/v2/partner/task/stop`: Stop task +- `/v2/partner/task/rerun`: Rerun task + +#### 1.1.3 api-3 +Refer to [interface](./../python/fate_flow/apps/worker/worker_app.py) implementation +- `/v2/worker/task/status`: Status report +- `/v2/worker/model/save`: Save model +- `/v2/worker/model/download`: Download model +- `/v2/worker/data/tracking/query`: Query data +- `/v2/worker/data/tracking/save`: Record data +- `/v2/worker/metric/save/`: Record metrics + +#### 1.1.4 api-4 +Refer to [interface](./../python/fate_flow/apps/scheduler/scheduler_app.py) implementation +- `/v2/scheduler/job/create`: Create a job +- `/v2/scheduler/job/stop`: Stop a job +- `/v2/scheduler/task/report`: Task report (e.g., status) +- `/v2/scheduler/job/rerun`: Rerun a job + +### 1.2 Scheduler +The scheduler mainly consists of two parts: scheduling logic and scheduling interface. In the case of interconnection in a heterogeneous scenario, a unified scheduling process and interface are indispensable. In the case mentioned above, when using FATE Flow as the scheduling party in connection with other vendors, the implementation of the scheduler can be ignored. + +#### 1.2.1 Approach +The core of scheduling is the scheduling process, which defines the lifecycle of a job. In version 1.x of FATE, the scheduler and the initiator logic are bound, meaning the coordination scheduling of jobs from multiple parties is done at the initiator. This has a disadvantage: suppose companies A, B, and C each have the need to initiate tasks, their scheduling layers need to implement the scheduler based on the same scheduling logic, and the cost of interconnection is high. In version 2.0, the initiator and scheduler logic in the scheduling module are decoupled, and the scheduler can be specified in the job configuration. In the above case, as long as any one of A, B, or C companies implements the scheduler, or directly uses FATE as the scheduler, other vendors only need to implement the scheduler client interface to meet the requirements, greatly reducing the cost of interconnection. + +![scheduler](./images/scheduler.png) +

P represents the scheduling client interface, S represents the scheduler interface

+ + +To illustrate this scheduling mode with an example: Suppose A wants to create a job with C, and FATE Flow is the scheduler. First, A requests the FATE-Flow S (create-job) interface. After receiving the request, FATE Flow obtains participant information (A, C) through job configuration, and then distributes it to the P (create-job) interface of each participant. + +#### 1.2.2 Scheduling Logic +It manages the lifecycle of jobs, including when to start and stop jobs, when to start and stop tasks, DAG parsing, and component runtime dependencies, etc. FATE Flow's scheduling process is divided into two modes based on task status acquisition: callback and poll. Among them, the callback mode is for the participants to actively report task status to the scheduler, and the poll mode is for the scheduler to pull task status from the participants at regular intervals. The scheduling process diagrams for the two modes are as follows: + +![schedule-for-callback](./images/schedule_for_callback.png) +

Callback Mode

+ + +![schedule-for-poll](./images/schedule_for_poll.png) +

Poll Mode

+ + +#### 1.2.3 Scheduling Interface +Responsible for receiving requests from the platform layer and distributing them to the interfaces of various participants [api-2](#api-2), such as creating jobs, stopping jobs, etc. Interfaces see [api-4](#api-4) + + +## 2 Algorithm Integration Guide +In previous versions of FATE, algorithms ran as local processes started by the scheduling service, and there were shortcomings in terms of scalability, making it difficult to meet the needs of interconnection. In version 2.0, the "algorithm container" is used to run algorithms, implementing heterogeneous algorithm scheduling functionality through a standardized algorithm image construction and loading mechanism. + +![scheduler](./images/federationml_schedule.png) + +### 2.1 FATE Algorithm Containerization Solution +- Pre-processing: Input processing for data, models, algorithm parameters, etc., will call the platform-layer interface [api-3](#api-3) to obtain relevant dependencies. +- Component runtime: Algorithm component logic. +- Post-processing: Output content processing for algorithm components, will call the platform-layer interface [api-3](#api-3) to upload the output to the platform. +![](./images/schedule_for_component.png) + +### 2.2 Integration +#### 2.2.1 Algorithm Parameters +FATE Flow will pass parameters to the algorithm container in the form of environment variables, with the key being "CONFIG" and the parameter value being a JSON string. The content is as follows: +``` +component: psi +computing_partitions: 8 +conf: + computing: + metadata: + computing_id: 202402271112016150790_psi_0_0_host_9998 + host:127.0.0.1 + port:4670 + type: standalone/eggroll/spark + device: + metadata: {} + type: CPU + federation: + metadata: + federation_id: 202402271112016150790_psi_0_0 + parties: + local: + partyid: '9998' + role: host + parties: + - partyid: '9999' + role: guest + - partyid: '9998' + role: host + osx_config: + host: 127.0.01 + port: 9370 + type: osx + logger: + config: + storage: standalone/eggroll/hdfs +engine_run: + cores: 4 +input_artifacts: + data: + input_data: + output_artifact_key: output_data + output_artifact_type_alias: null + parties: + - party_id: + - '9998' + role: host + producer_task: reader_0 + model: null +job_id: '202402271112016150790' +launcher_conf: {} +launcher_name: default +mlmd: + metadata: + api_version: v2 + host: 127.0.0.1 + port: 9380 + protocol: http + type: flow +model_id: '202402271112016150790' +model_version: '0' +parameters: {} +party_id: '9998' +party_task_id: 202402271112016150790_psi_0_0_host_9998 +provider_name: fate +role: host +stage: default +task_id: 202402271112016150790_psi_0 +task_name: psi_0 +task_version: '0' +``` +Here are the key configurations: +- `component`: The name of the algorithm. When multiple algorithms are packaged in the same image, this parameter is used to identify them. +- `conf.computing`: Configuration for the computing engine. +- `conf.federation`: Configuration for the communication engine. +- `conf.storage`: Configuration for the storage engine, supporting standalone/eggroll and hdfs. +- `mlmd`: Platform-layer interface used for recording the output of the algorithm. The interface is [api-3](#api-3). +- `input_artifacts`: Input dependencies, including data, models, etc. +- `parameters`: Algorithm parameters. +The entry point for starting the algorithm needs to be specified with CMD when building the image, and the algorithm should call the status reporting interface in [api-3](#api-3) upon completion. + + +#### 2.2.2 Registering Algorithm Image +```shell +flow provider register -c examples/provider/register_image.json +``` +Where `register_image.json` looks like this: +```json +{ + "name": "fate", + "device": "docker", + "version": "2.1.0", + "metadata": { + "base_url": "", + "image": "federatedai/fate:2.1.0" + } +} +``` + +#### 2.2.3 Using Algorithm Image +After registration, in the DAG of the job configuration, you can specify the provider to run this FATE algorithm image, as shown below: +```yaml +dag: + conf: + task: + provider: fate:2.1.0@docker +``` +Alternatively, you can specify this image for a specific algorithm. For details, refer to the [provider guide](./provider_register.md). \ No newline at end of file diff --git a/doc/fate_access.zh.md b/doc/fate_access.zh.md new file mode 100644 index 000000000..17022b056 --- /dev/null +++ b/doc/fate_access.zh.md @@ -0,0 +1,202 @@ +# FATE 2.0 版本互联互通接入指南 + +## 1. FATE Flow接入指南 +- 说明:此章节为异构调度平台对接FATE调度平台FATE FLow的接入指南 +- 场景:本方为待接入系统,合作方为FATE站点 + +### 1.1 接口 +![api](./images/fate_flow_api.png) +FATE flow接口划分为4类: +- 1负责接收来自上层系统的请求,如提交、停止、查询作业等; +- 2负责接收来自调度层的请求,如开始、停止任务等; +- 3负责接收来自算法容器的请求,如任务的状态、输入上报等 +- 4负责来自平台层的请求,并分发给各个参与方的接口2,如创建作业,停止作业等 + +#### 1.1.1 api-1 +说明:由于是对接上层系统,并不涉及调度之间的交互,非必需接口,可自定义实现,接口层面不做约束。 + +#### 1.1.2 api-2 +可参考[接口](./../python/fate_flow/apps/partner/partner_app.py)实现 +- `/v2/partner/job/create`: 创建作业 +- `/v2/partner/job/start`: 开始作业 +- `/v2/partner/job/status/update`: 更新作业状态 +- `/v2/partner/job/update`: 更新作业(如进度信息) +- `/v2/partner/job/resource/apply`: 申请作业资源 +- `/v2/partner/job/resource/return`: 归还作业资源 +- `/v2/partner/job/stop`: 停止作业 +- `/v2/partner/task/resource/apply`: 申请任务资源 +- `/v2/partner/task/resource/return`: 归还任务资源 +- `/v2/partner/task/start`: 开始任务 +- `/v2/partner/task/collect`: 调度层收集任务状态 +- `/v2/partner/task/status/update`: 更新任务状态 +- `/v2/partner/task/stop`: 停止任务 +- `/v2/partner/task/rerun`: 重跑任务 + +#### 1.1.3 api-3 +可参考[接口](./../python/fate_flow/apps/worker/worker_app.py)实现 +- `/v2/worker/task/status`: 状态上报 +- `/v2/worker/model/save`: 模型存储 +- `/v2/worker/model/download`: 模型下载 +- `/v2/worker/data/tracking/query`: 查询数据 +- `/v2/worker/data/tracking/save`: 记录数据 +- `/v2/worker/metric/save/`: 记录指标 + +#### 1.1.4 api-4 +可参考[接口](./../python/fate_flow/apps/scheduler/scheduler_app.py)实现 +- `/v2/scheduler/job/create`: 创建作业、 +- `/v2/scheduler/job/stop`: 停止作业 +- `/v2/scheduler/task/report`: 任务上报(如状态) +- `/v2/scheduler/job/rerun`: 重跑作业 + + +### 1.2 调度器 +调度器主要包括两部分:调度逻辑和调度接口。异构的场景下的调度层想要实现互联互通,统一的调度流程和接口是不可或缺的。上述提到若使用FATE Flow作为调度方,与其它厂商互联时,可忽略调度器的实现。 + +#### 1.2.1 方案 +调度的核心是调度流程,流程定义作业的生命周期。在FATE 1.x版本中调度器与发起方逻辑是绑定的,即多方作业的协调调度是在发起方。 +这样有个弊处:假设A、B、C三家厂商各自都有发起任务的需求,他们的调度层都需要基于相同的调度逻辑实现调度器,互联互通的成本较高。 +在2.0版本中,将调度模块中的发起方与调度方逻辑解耦,且调度方可以在作业配置中被指定。在上诉的案例中,只要A、B、C厂商中的任意一家实现了调度器, +或者直接使用FATE作为调度方,其他厂商只需要实现调度客户端接口,即可满足需求,大大降低互联互通成本。 + +![scheduler](./images/scheduler.png) +

P代表调度客户端接口,S代表调度器接口

+ + +举个例子简单说明下该调度模式:假设A想要和C创建作业,调度方为FATE Flow。首先A请求FATE-FLow S(create-job)接口, FATE FLow收到请求后通过job配置获取参与方信息(A、C),随即分发给参与方各自的P(create-job)接口。 + + +#### 1.2.2 调度逻辑 +对作业的生命周期管理,主要包括作业何时启停、任务何时启停、DAG解析、组件运行依赖等等。FATE FLow的调度流程按任务状态获取模式分为两种: +callback和poll。其中callback模式是由各参与方主动上报任务状态给调度方,poll模式是调度方定时向各参与方拉取任务状态。 +两种模式对应的调度流程图如下: + +![schedule-for-callback](./images/schedule_for_callback.png) +

callback模式

+ + +![schedule-for-poll](./images/schedule_for_poll.png) +

poll模式

+ + +#### 1.2.3 调度接口 +负责来自平台层的请求,并分发给各个参与方的[api-2](#api-2),如创建作业,停止作业等。接口见[api-4](#api-4) + + +## 2 算法接入指南 +在FATE 历史版本中,算法是以调度服务启动的本地进程方式运行,在扩展性方面存在不足,很难满足互联互通的需求。在2.0版本中采用“算法容器”运行算法, +通过制定统一的算法镜像构建标准与定义一套规范的镜像加载机制来实现异构算法调度功能。 +![scheduler](./images/federationml_schedule.png) + +### 2.1 FATE算法容器化方案 +- 前处理: 数据、模型、算法参数等输入处理,会调用平台层接口[api-3](#api-3)从平台获取相关依赖 +- 组件运行: 算法组件逻辑 +- 后处理: 算法组件输出内容处理,会调用平台层接口[api-3](#api-3)将输出上传到平台 +![](./images/schedule_for_component.png) + +### 2.2 接入 +#### 2.2.1 算法参数 +FATE FLow会以环境变量的形式将参数传递给算法容器,环境变量的key为"CONFIG",参数值为JSON字符串。内容如下: +``` +component: psi +computing_partitions: 8 +conf: + computing: + metadata: + computing_id: 202402271112016150790_psi_0_0_host_9998 + host:127.0.0.1 + port:4670 + type: standalone/eggroll/spark + device: + metadata: {} + type: CPU + federation: + metadata: + federation_id: 202402271112016150790_psi_0_0 + parties: + local: + partyid: '9998' + role: host + parties: + - partyid: '9999' + role: guest + - partyid: '9998' + role: host + osx_config: + host: 127.0.01 + port: 9370 + type: osx + logger: + config: + storage: standalone/eggroll/hdfs +engine_run: + cores: 4 +input_artifacts: + data: + input_data: + output_artifact_key: output_data + output_artifact_type_alias: null + parties: + - party_id: + - '9998' + role: host + producer_task: reader_0 + model: null +job_id: '202402271112016150790' +launcher_conf: {} +launcher_name: default +mlmd: + metadata: + api_version: v2 + host: 127.0.0.1 + port: 9380 + protocol: http + type: flow +model_id: '202402271112016150790' +model_version: '0' +parameters: {} +party_id: '9998' +party_task_id: 202402271112016150790_psi_0_0_host_9998 +provider_name: fate +role: host +stage: default +task_id: 202402271112016150790_psi_0 +task_name: psi_0 +task_version: '0' +``` +其中,关键的配置为: +- component:算法名。多个算法打包在同一个镜像时可通过该参数标识 +- conf.computing: 为计算引擎配置 +- conf.federation: 为通信引擎配置 +- conf.storage: 为存储引擎配置,支持standalone/eggroll和hdfs +- mlmd: 为平台层接口,供算法的输出记录使用。接口为[api-3](#api-3) +- input_artifacts:输入依赖,包括数据、模型等 +- parameters:算法参数 +算法的启动入口需要在打镜像时指定CMD,算法结束需要调用[api-3](#api-3)中的状态上报接口 + + +#### 2.2.2 注册算法镜像 +```shell +flow provider register -c examples/provider/register_image.json +``` +其中,register_image.json如下: +```json +{ + "name": "fate", + "device": "docker", + "version": "2.1.0", + "metadata": { + "base_url": "", + "image": "federatedai/fate:2.1.0" + } +} +``` + +#### 2.2.3 使用算法镜像 +注册完成后,在作业配置的DAG中可以指定provider来运行此fate算法镜像,参考如下: +```yaml +dag: + conf: + task: + provider: fate:2.1.0@docker +``` +或者你也可以指定单个算法使用此镜像,详细可参考[provider指南](./provider_register.zh.md) diff --git a/doc/images/fate_flow_api.png b/doc/images/fate_flow_api.png new file mode 100644 index 000000000..d1c07f1ae Binary files /dev/null and b/doc/images/fate_flow_api.png differ diff --git a/doc/images/scheduler.png b/doc/images/scheduler.png new file mode 100644 index 000000000..d625749e8 Binary files /dev/null and b/doc/images/scheduler.png differ diff --git a/examples/lr/train_lr.yaml b/examples/lr/train_lr.yaml index 1b13597d7..ccd59ffe9 100644 --- a/examples/lr/train_lr.yaml +++ b/examples/lr/train_lr.yaml @@ -80,7 +80,7 @@ dag: - lr_0 inputs: data: - input_data: + input_datas: task_output_artifact: - output_artifact_key: train_output_data producer_task: lr_0 diff --git a/examples/provider/register_image.json b/examples/provider/register_image.json new file mode 100644 index 000000000..c5d06dc86 --- /dev/null +++ b/examples/provider/register_image.json @@ -0,0 +1,9 @@ +{ + "name": "fate", + "device": "docker", + "version": "2.1.0", + "metadata": { + "base_url": "", + "image": "federatedai/fate:2.1.0" + } +} \ No newline at end of file diff --git a/fateflow.env b/fateflow.env index 3d358ab8a..e80e03fca 100644 --- a/fateflow.env +++ b/fateflow.env @@ -1,3 +1,3 @@ -FATE=2.0.0 -FATE_FLOW=2.0.0 +FATE=2.1.0 +FATE_FLOW=2.1.0 PYTHON=3.8 \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index c04d0c299..a75b3cf16 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -15,6 +15,7 @@ nav: - job_scheduling.md - provider_register.md - system_conf.md + - fate_access.md - bfia_access.md - API: swagger/index.md diff --git a/python/fate_flow/_info.py b/python/fate_flow/_info.py index 319c98fb7..4093fdb3f 100644 --- a/python/fate_flow/_info.py +++ b/python/fate_flow/_info.py @@ -12,5 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "2.0.0" +__version__ = "2.1.0" __provider__ = "fate_flow" diff --git a/python/fate_flow/apps/client/data_app.py b/python/fate_flow/apps/client/data_app.py index 2612b63f0..1b32c7074 100644 --- a/python/fate_flow/apps/client/data_app.py +++ b/python/fate_flow/apps/client/data_app.py @@ -14,6 +14,8 @@ # limitations under the License. # import json +import os.path + from webargs import fields from flask import request from fate_flow.apps.desc import SERVER_FILE_PATH, HEAD, PARTITIONS, META, EXTEND_SID, NAMESPACE, NAME, DATA_WAREHOUSE, \ @@ -21,6 +23,7 @@ from fate_flow.engine import storage from fate_flow.manager.components.component_manager import ComponentManager from fate_flow.manager.outputs.data import DataManager +from fate_flow.settings import UPLOAD_DATA_HOME from fate_flow.utils.api_utils import API from fate_flow.errors.server_error import NoFoundTable, NoFoundFile @@ -36,6 +39,9 @@ @API.Input.json(namespace=fields.String(required=False), desc=NAMESPACE) @API.Input.json(name=fields.String(required=False), desc=NAME) def upload_data(file, head, partitions, meta, namespace=None, name=None, extend_sid=False): + if not os.path.isabs(file): + if UPLOAD_DATA_HOME: + file = os.path.join(UPLOAD_DATA_HOME, file) if namespace and name: result = ComponentManager.upload_dataframe( file=file, head=head, partitions=partitions, meta=meta, namespace=namespace, name=name, extend_sid=extend_sid diff --git a/python/fate_flow/commands/server_cli.py b/python/fate_flow/commands/server_cli.py index 86c741c83..cb8ff32bb 100644 --- a/python/fate_flow/commands/server_cli.py +++ b/python/fate_flow/commands/server_cli.py @@ -14,6 +14,7 @@ # limitations under the License. # import os +import shutil import subprocess import platform import click @@ -21,10 +22,12 @@ import fate_flow from fate_flow.commands.service import manage_fate_service +from fate_flow.settings import DEFAULT_SERVER_CONF_PATH CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) HOME = os.path.dirname(fate_flow.__file__) -SERVER_CONF_PATH = os.path.join(HOME, "conf", "service_conf.yaml") +CONF_PATH = DEFAULT_SERVER_CONF_PATH or os.path.join(HOME, "conf") +SERVER_CONF_PATH = os.path.join(CONF_PATH, "service_conf.yaml") SETTING_PATH = os.path.join(HOME, "settings.py") SERVICE_SH = os.path.join(HOME, "commands", "service.sh") @@ -138,7 +141,14 @@ def get_version(): print(fate_flow.__version__) -def replace_settings(home_path): +def set_conf_home(home_path): + default_conf = os.path.join(HOME, "conf") + conf_home = f"{home_path}/conf" + shutil.copytree(default_conf, conf_home) + return conf_home + + +def replace_settings(home_path, data_home): import re with open(SETTING_PATH, "r") as file: content = file.read() @@ -146,7 +156,11 @@ def replace_settings(home_path): content = re.sub(r"MODEL_DIR.*", f"MODEL_DIR = \"{home_path}/model\"", content) content = re.sub(r"JOB_DIR.*", f"JOB_DIR = \"{home_path}/jobs\"", content) content = re.sub(r"LOG_DIR.*", f"LOG_DIR = \"{home_path}/logs\"", content) + content = re.sub(r"UPLOAD_DATA_HOME.*", f"UPLOAD_DATA_HOME = \"{data_home}\"", content) content = re.sub(r"SQLITE_FILE_NAME.*", f"SQLITE_FILE_NAME = \"{home_path}/fate_flow_sqlite.db\"", content) + + content = re.sub(r"DEFAULT_SERVER_CONF_PATH.*", f"DEFAULT_SERVER_CONF_PATH = \"{home_path}/conf\"", content) + with open(SETTING_PATH, "w") as file: file.write(content) @@ -158,6 +172,7 @@ def replace_settings(home_path): def init_server(ip, port, home): + conf_home = CONF_PATH with open(SERVER_CONF_PATH, "r") as file: config = yaml.safe_load(file) if ip: @@ -170,11 +185,16 @@ def init_server(ip, port, home): if not os.path.isabs(home): raise RuntimeError(f"Please use an absolute path: {home}") os.makedirs(home, exist_ok=True) + data_home = os.path.join(home, "upload") + os.makedirs(data_home, exist_ok=True) print(f"home: {home}") - replace_settings(home) + conf_home = set_conf_home(home) + replace_settings(home, data_home) if ip or port: - with open(SERVER_CONF_PATH, "w") as file: + service_conf_path = SERVER_CONF_PATH if not conf_home else os.path.join(conf_home, "service_conf.yaml") + print(f"Conf path: {service_conf_path}") + with open(service_conf_path, "w") as file: yaml.dump(config, file, default_flow_style=False) print("Init server completed!") @@ -182,7 +202,8 @@ def init_server(ip, port, home): def run_command(command): try: - command = f"bash {SERVICE_SH} {HOME} {command}" + service_conf_path = os.path.join(CONF_PATH, "service_conf.yaml") + command = f"bash {SERVICE_SH} {HOME} {command} {service_conf_path}" result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, text=True) if result.returncode == 0: print(result.stdout) diff --git a/python/fate_flow/commands/service.sh b/python/fate_flow/commands/service.sh index ebcd442a4..fcd554572 100644 --- a/python/fate_flow/commands/service.sh +++ b/python/fate_flow/commands/service.sh @@ -36,6 +36,7 @@ # # ----------------------------------------------------------------------------- FATE_FLOW_BASE=$1 +CONF_PATH=$3 LOG_DIR=$FATE_FLOW_BASE/logs @@ -144,7 +145,6 @@ load_config() { LOG_STDOUT="${LOG_DIR}/console.log" print_info "Checking service configuration..." - CONF_PATH="${FATE_FLOW_BASE}/conf/service_conf.yaml" if [ -f "${CONF_PATH}" ]; then print_ok "Service configuration file found." ${CONF_PATH} else diff --git a/python/fate_flow/components/define/download.yaml b/python/fate_flow/components/define/download.yaml index 63fccb1a5..69f017131 100644 --- a/python/fate_flow/components/define/download.yaml +++ b/python/fate_flow/components/define/download.yaml @@ -2,7 +2,7 @@ component: name: download description: '' provider: fate_flow - version: 2.0.0 + version: 2.1.0 labels: [] roles: - guest diff --git a/python/fate_flow/components/define/upload.yaml b/python/fate_flow/components/define/upload.yaml index 4027ef457..158f884f4 100644 --- a/python/fate_flow/components/define/upload.yaml +++ b/python/fate_flow/components/define/upload.yaml @@ -2,7 +2,7 @@ component: name: upload description: '' provider: fate_flow - version: 2.0.0 + version: 2.1.0 labels: [] roles: - guest diff --git a/python/fate_flow/components/entrypoint/cli.py b/python/fate_flow/components/entrypoint/cli.py index 196669b25..4b376ec62 100644 --- a/python/fate_flow/components/entrypoint/cli.py +++ b/python/fate_flow/components/entrypoint/cli.py @@ -34,7 +34,7 @@ def component(): @component.command() @click.option("--config", required=False, type=click.File(), help="config path") -@click.option("--env-name", required=False, type=str, help="env name for config") +@click.option("--env-name", required=True, type=str, help="env name for config", default="CONFIG") @click.option("--wraps-module", required=False, type=str, help="component run wraps module") def entrypoint(config, env_name, wraps_module): # parse config diff --git a/python/fate_flow/controller/parser.py b/python/fate_flow/controller/parser.py index 87dfc91f4..3208247a9 100644 --- a/python/fate_flow/controller/parser.py +++ b/python/fate_flow/controller/parser.py @@ -248,7 +248,10 @@ def generate_computing_conf(self): if ENGINES.get(EngineType.COMPUTING).lower() == ComputingEngine.SPARK: return SparkComputingSpec( type=ENGINES.get(EngineType.COMPUTING).lower(), - metadata={"computing_id": self.computing_id} + metadata={ + "computing_id": self.computing_id, + "options": {"home": COMPUTING_CONF.get(ComputingEngine.SPARK).get("home")} + } ) @staticmethod diff --git a/python/fate_flow/controller/task.py b/python/fate_flow/controller/task.py index 3c3453f2f..f87fef197 100644 --- a/python/fate_flow/controller/task.py +++ b/python/fate_flow/controller/task.py @@ -120,8 +120,8 @@ def update_local(task): @staticmethod def update_launcher_config(task, launcher_name, task_parameters): # support deepspeed and other launcher - if task.f_role == "arbiter": - return + # if task.f_role == "arbiter": + # return schedule_logger(task.f_job_id).info(f"task runtime launcher name: {launcher_name}") if launcher_name and launcher_name != LauncherType.DEFAULT: task_parameters.launcher_name = task.f_launcher_name = launcher_name diff --git a/python/fate_flow/engine/backend/_spark.py b/python/fate_flow/engine/backend/_spark.py index 3f7782720..2a16fe236 100644 --- a/python/fate_flow/engine/backend/_spark.py +++ b/python/fate_flow/engine/backend/_spark.py @@ -16,13 +16,17 @@ import os from fate_flow.engine.backend._base import LocalEngine +from fate_flow.entity.spec.dag import TaskConfigSpec from fate_flow.entity.types import WorkerName from fate_flow.manager.service.worker_manager import WorkerManager class SparkEngine(LocalEngine): def run(self, task_info, run_parameters, conf_path, output_path, engine_run, provider_name, **kwargs): - spark_home = os.environ.get("SPARK_HOME", None) + spark_home = None + parameters = TaskConfigSpec.parse_obj(run_parameters) + if parameters.conf.computing.metadata.options: + spark_home = parameters.conf.computing.metadata.options.get("home") if not spark_home: try: import pyspark diff --git a/python/fate_flow/engine/devices/container.py b/python/fate_flow/engine/devices/container.py index 6bbd45894..6eca3e714 100644 --- a/python/fate_flow/engine/devices/container.py +++ b/python/fate_flow/engine/devices/container.py @@ -56,7 +56,8 @@ def _flatten_dict(cls, data, parent_key='', sep='.'): @classmethod def _get_environment(cls, task: Task, run_parameters): - return cls._flatten_dict(run_parameters) + # return cls._flatten_dict(run_parameters) + return {"CONFIG": json.dumps(run_parameters)} @classmethod def _get_volume(cls, task): diff --git a/python/fate_flow/entity/code/_api.py b/python/fate_flow/entity/code/_api.py index 960285208..b145b59d5 100644 --- a/python/fate_flow/entity/code/_api.py +++ b/python/fate_flow/entity/code/_api.py @@ -25,7 +25,8 @@ class Job: UPDATE_FAILED = 1003 KILL_FAILED = 1004 RESOURCE_EXCEPTION = 1005 - INHERITANCE_FAILED = 1006 + RESOURCE_LIMIT_EXCEEDED = 1006 + INHERITANCE_FAILED = 1007 class Task: NOT_FOUND = 2000 diff --git a/python/fate_flow/manager/outputs/data.py b/python/fate_flow/manager/outputs/data.py index 48832ca42..7883129bb 100644 --- a/python/fate_flow/manager/outputs/data.py +++ b/python/fate_flow/manager/outputs/data.py @@ -134,7 +134,11 @@ def display_data(table_metas): datas[key] = [] for meta in metas: if meta.data_type in [DataType.DATAFRAME, DataType.TABLE]: - datas[key].append({"data": meta.get_part_of_data(), "metadata": meta.get_data_meta()}) + datas[key].append({ + "data": meta.get_part_of_data(), + "metadata": meta.get_data_meta(), + "total": meta.get_count()} + ) else: continue return datas @@ -173,10 +177,12 @@ def display_output_data(cls, **kwargs): if key not in outputs: outputs[key] = [] for table in tables: - outputs[key].append(storage.StorageTableMeta( + meta = storage.StorageTableMeta( name=table.get("name"), namespace=table.get("namespace") - )) + ) + if meta: + outputs[key].append(meta) return cls.display_data(outputs) @staticmethod diff --git a/python/fate_flow/manager/service/resource_manager.py b/python/fate_flow/manager/service/resource_manager.py index 55aebdb97..423d1205f 100644 --- a/python/fate_flow/manager/service/resource_manager.py +++ b/python/fate_flow/manager/service/resource_manager.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from fate_flow.entity.code import ReturnCode from pydantic import typing from fate_flow.db.base_models import DB @@ -90,6 +91,9 @@ def resource_for_job(cls, job_id, role, party_id, operation_type: ResourceOperat operate_status = False cores, memory = cls.query_job_resource(job_id=job_id, role=role, party_id=party_id) engine_name = ENGINES.get(EngineType.COMPUTING) + total_cores = EngineRegistry.query(engine_name=engine_name)[0].f_cores + if cores > total_cores: + raise RuntimeError(ReturnCode.Job.RESOURCE_LIMIT_EXCEEDED, "Resource limit exceeded") try: with DB.atomic(): updates = { diff --git a/python/fate_flow/manager/service/service_manager.py b/python/fate_flow/manager/service/service_manager.py index 65bcf7080..ea9e6f652 100644 --- a/python/fate_flow/manager/service/service_manager.py +++ b/python/fate_flow/manager/service/service_manager.py @@ -16,6 +16,7 @@ import abc import atexit import json +import os import socket import time from functools import wraps @@ -24,6 +25,7 @@ from threading import Thread from urllib import parse +from fate_flow.utils.file_utils import get_fate_flow_directory from kazoo.client import KazooClient from kazoo.exceptions import NodeExistsError, NoNodeError, ZookeeperError from kazoo.security import make_digest_acl @@ -37,6 +39,7 @@ from fate_flow.runtime.reload_config_base import ReloadConfigBase from fate_flow.runtime.system_settings import RANDOM_INSTANCE_ID, HOST, HTTP_PORT, GRPC_PORT, ZOOKEEPER_REGISTRY, \ ZOOKEEPER, USE_REGISTRY, NGINX_HOST, NGINX_HTTP_PORT, FATE_FLOW_MODEL_TRANSFER_ENDPOINT, SERVICE_CONF_NAME +from fate_flow.settings import DEFAULT_SERVER_CONF_PATH from fate_flow.utils import conf_utils, file_utils from fate_flow.utils.log import getLogger from fate_flow.utils.version import get_flow_version @@ -421,7 +424,8 @@ def load_server_info_from_db(cls): @classmethod def load_server_info_from_conf(cls): - path = Path(file_utils.get_fate_flow_directory()) / 'conf' / SERVICE_CONF_NAME + conf_path = DEFAULT_SERVER_CONF_PATH or os.path.join(get_fate_flow_directory(), "conf") + path = Path(conf_path) / SERVICE_CONF_NAME conf = file_utils.load_yaml_conf(path) if not isinstance(conf, dict): raise ValueError('invalid config file') diff --git a/python/fate_flow/runtime/system_settings.py b/python/fate_flow/runtime/system_settings.py index 245c864e6..1f0e1b98b 100644 --- a/python/fate_flow/runtime/system_settings.py +++ b/python/fate_flow/runtime/system_settings.py @@ -35,7 +35,7 @@ APP_MANAGER_PAGE = "app" ADMIN_PAGE = [PERMISSION_MANAGER_PAGE, APP_MANAGER_PAGE] -FATE_FLOW_CONF_PATH = os.path.join(get_fate_flow_directory(), "conf") +FATE_FLOW_CONF_PATH = DEFAULT_SERVER_CONF_PATH or os.path.join(get_fate_flow_directory(), "conf") FATE_FLOW_JOB_DEFAULT_CONFIG_PATH = os.path.join(FATE_FLOW_CONF_PATH, "job_default_config.yaml") diff --git a/python/fate_flow/scheduler/scheduler.py b/python/fate_flow/scheduler/scheduler.py index bfc3a44bd..46aa41852 100644 --- a/python/fate_flow/scheduler/scheduler.py +++ b/python/fate_flow/scheduler/scheduler.py @@ -133,11 +133,16 @@ def apply_job_resource(cls, job): def rollback_job_resource(cls, job, federated_response): rollback_party = [] failed_party = [] + stop_status = False for dest_role in federated_response.keys(): for dest_party_id in federated_response[dest_role].keys(): retcode = federated_response[dest_role][dest_party_id]["code"] if retcode == ReturnCode.Base.SUCCESS: rollback_party.append({"role": dest_role, "party_id": [dest_party_id]}) + elif retcode == ReturnCode.Job.RESOURCE_LIMIT_EXCEEDED: + # stop job + schedule_logger(job.f_job_id).exception(f"{dest_role} {dest_party_id} resource limit exceeded") + stop_status = True else: failed_party.append({"role": dest_role, "party_id": [dest_party_id]}) schedule_logger(job.f_job_id).info("job apply resource failed on {}, rollback {}".format(failed_party, @@ -153,6 +158,10 @@ def rollback_job_resource(cls, job, federated_response): else: schedule_logger(job.f_job_id).info("job no party should be rollback resource") + if stop_status: + cls.stop_job(job.f_job_id, stop_status=JobStatus.FAILED) + ScheduleJobSaver.update_job_status({"job_id": job.f_job_id, "status": JobStatus.FAILED}) + @classmethod @wraps_utils.schedule_lock def schedule_waiting_jobs(cls, job: ScheduleJob): @@ -317,7 +326,11 @@ def rerun_job(cls, job_id, auto, tasks: typing.List[ScheduleTaskStatus] = None): schedule_logger(job_id).info(f"require {[task.f_task_name for task in tasks]} to rerun") else: # todo: get_need_revisit_nodes - tasks = ScheduleJobSaver.query_task(job_id=job_id, status=TaskStatus.CANCELED, scheduler_status=True) + tasks = ScheduleJobSaver.query_task( + job_id=job_id, + status=[TaskStatus.CANCELED, TaskStatus.FAILED, TaskStatus.TIMEOUT], + scheduler_status=True + ) job_can_rerun = any([TaskController.prepare_rerun_task( job=job, task=task, auto=auto, force=False, ) for task in tasks]) diff --git a/python/fate_flow/settings.py b/python/fate_flow/settings.py index 100932377..663ea7cc9 100644 --- a/python/fate_flow/settings.py +++ b/python/fate_flow/settings.py @@ -25,6 +25,8 @@ MODEL_DIR = "" JOB_DIR = "" DEFAULT_FATE_DIR = "" +DEFAULT_SERVER_CONF_PATH = "" +UPLOAD_DATA_HOME = "" # sqlite SQLITE_FILE_DIR = "" diff --git a/python/fate_flow/utils/conf_utils.py b/python/fate_flow/utils/conf_utils.py index 318960d9d..19b7b7bf6 100644 --- a/python/fate_flow/utils/conf_utils.py +++ b/python/fate_flow/utils/conf_utils.py @@ -16,14 +16,15 @@ import os from .file_utils import load_yaml_conf, get_fate_flow_directory +from ..settings import DEFAULT_SERVER_CONF_PATH SERVICE_CONF = "service_conf.yaml" TRANSFER_CONF = "transfer_conf.yaml" def conf_realpath(conf_name): - conf_path = f"conf/{conf_name}" - return os.path.join(get_fate_flow_directory(), conf_path) + conf_path = DEFAULT_SERVER_CONF_PATH or os.path.join(get_fate_flow_directory(), "conf") + return f"{conf_path}/{conf_name}" def get_base_config(key, default=None, conf_name=SERVICE_CONF) -> dict: diff --git a/python/requirements-docker.txt b/python/requirements-docker.txt index d58644821..f2e17348d 100644 --- a/python/requirements-docker.txt +++ b/python/requirements-docker.txt @@ -6,3 +6,6 @@ # container -r requirements-container.txt + +# eggroll-client +-r requirements-eggroll-client.txt diff --git a/python/requirements-eggroll-client.txt b/python/requirements-eggroll-client.txt new file mode 100644 index 000000000..d8a184799 --- /dev/null +++ b/python/requirements-eggroll-client.txt @@ -0,0 +1 @@ +eggroll \ No newline at end of file diff --git a/python/requirements-eggroll.txt b/python/requirements-eggroll.txt index 38e9ade97..63d09f2c9 100644 --- a/python/requirements-eggroll.txt +++ b/python/requirements-eggroll.txt @@ -1,6 +1,8 @@ -cloudpickle==2.1.0 -lmdb==1.3.0 +opentelemetry-api +opentelemetry-sdk protobuf==4.24.4 grpcio==1.59.3 grpcio-tools==1.59.3 -psutil>=5.7.0 \ No newline at end of file +cloudpickle==2.1.0 +psutil>=5.7.0 +pynvml==11.5.0 \ No newline at end of file diff --git a/python/requirements-fate.txt b/python/requirements-fate.txt index 283df32e6..fd164d99a 100644 --- a/python/requirements-fate.txt +++ b/python/requirements-fate.txt @@ -1,4 +1,5 @@ lmdb==1.3.0 +torch==1.13.1 fate_utils pydantic==1.10.12 cloudpickle==2.1.0 @@ -6,8 +7,8 @@ click ruamel-yaml==0.16 numpy pandas==2.0.3 -transformers==4.30.2 -accelerate==0.20.2 +transformers==4.37.2 +accelerate==0.27.2 beautifultable requests<2.26.0 scikit-learn