Skip to content

Commit

Permalink
Merge pull request #5269 from FederatedAI/feature-1.11.5-stats
Browse files Browse the repository at this point in the history
Feature 1.11.5 stats
  • Loading branch information
mgqa34 authored Nov 20, 2023
2 parents 87dd4f6 + 3040c4c commit 7b4374e
Show file tree
Hide file tree
Showing 15 changed files with 543 additions and 104 deletions.
2 changes: 2 additions & 0 deletions examples/pipeline/feature_imputation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ This section introduces the pipeline scripts of feature imputation tasks.
3. Feature Imputation using Different Methods for Different Columns:
script: pipeline-feature-imputation-column-method.py

4. Feature Imputation using Different Methods and Different Filling Values for Different Columns:
script: pipeline-feature-imputation-designated-column.py
Users can use following commands to running the task.

python ${pipeline_script}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
"table_name": "dvisits_hetero_guest",
"namespace": "experiment",
"role": "guest_0"
},
{
"file": "examples/data/mocked_string_data.csv",
"head": 1,
"partition": 4,
"table_name": "mocked_string_data",
"namespace": "experiment",
"role": "guest_0"
}
],
"pipeline_tasks": {
Expand All @@ -42,6 +50,9 @@
},
"diff-method-per-column": {
"script": "pipeline-feature-imputation-column-method.py"
},
"pipeline-feature-imputation-designated-column": {
"script": "pipeline-feature-imputation-designated-column.py"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
# Copyright 2019 The FATE Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import argparse

from pipeline.backend.pipeline import PipeLine
from pipeline.component import DataTransform
from pipeline.component import FeatureImputation
from pipeline.component import Reader
from pipeline.interface import Data
from pipeline.utils.tools import load_job_config


def main(config="../../config.yaml", namespace=""):
# obtain config
if isinstance(config, str):
config = load_job_config(config)
parties = config.parties
guest = parties.guest[0]

guest_train_data = {"name": "mocked_string_data", "namespace": f"experiment{namespace}"}

pipeline = PipeLine().set_initiator(role='guest', party_id=guest).set_roles(guest=guest)

reader_0 = Reader(name="reader_0")
reader_0.get_party_instance(role='guest', party_id=guest).component_param(table=guest_train_data)

data_transform_0 = DataTransform(name="data_transform_0", with_label=False, data_type='str',
exclusive_data_type={'y': 'int'})

feature_imputation_0 = FeatureImputation(name="feature_imputation_0",
missing_fill_method=None,
col_missing_fill_method={'x0': 'designated',
'x1': 'designated',
'x2': 'mode',
'x4': 'mode',
'y': 'median'},
default_value='10',
col_default_value={'x0': 'Z', 'x1': 'X'},
missing_impute=['A', 0])

pipeline.add_component(reader_0)
pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))
pipeline.add_component(feature_imputation_0, data=Data(data=data_transform_0.output.data))
pipeline.compile()

pipeline.fit()

# predict
# deploy required components
pipeline.deploy_component([data_transform_0,
feature_imputation_0])

predict_pipeline = PipeLine()
# add data reader onto predict pipeline
predict_pipeline.add_component(reader_0)
# add selected components from train pipeline onto predict pipeline
# specify data source
predict_pipeline.add_component(
pipeline, data=Data(
predict_input={
pipeline.data_transform_0.input.data: reader_0.output.data}))
# run predict model
predict_pipeline.predict()


if __name__ == "__main__":
parser = argparse.ArgumentParser("PIPELINE DEMO")
parser.add_argument("-config", type=str,
help="config file")
args = parser.parse_args()
if args.config is not None:
main(args.config)
else:
main()
3 changes: 3 additions & 0 deletions examples/pipeline/union/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ This section introduces the Pipeline scripts for different types of tasks.

script: pipeline-union-tag-value.py

5. Union Task Joining Tables Along axis-1:

script: pipeline-union-axis-1.py

Users can run a pipeline job directly:

Expand Down
80 changes: 80 additions & 0 deletions examples/pipeline/union/pipeline-union-axis-1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#
# Copyright 2019 The FATE Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import argparse

from pipeline.backend.pipeline import PipeLine
from pipeline.component import DataTransform
from pipeline.component import Reader
from pipeline.component import Union
from pipeline.interface import Data
from pipeline.utils.tools import load_job_config


def main(config="../../config.yaml", namespace=""):
# obtain config
if isinstance(config, str):
config = load_job_config(config)
parties = config.parties
guest = parties.guest[0]

guest_train_data = {"name": "motor_hetero_guest", "namespace": f"experiment{namespace}"}
host_train_data = {"name": "motor_hetero_host", "namespace": f"experiment{namespace}"}

pipeline = PipeLine().set_initiator(role='guest', party_id=guest).set_roles(guest=guest)

reader_0 = Reader(name="reader_0")
reader_0.get_party_instance(role='guest', party_id=guest).component_param(table=guest_train_data)

reader_1 = Reader(name="reader_1")
reader_1.get_party_instance(role='guest', party_id=guest).component_param(table=host_train_data)

data_transform_0 = DataTransform(name="data_transform_0")
data_transform_1 = DataTransform(name="data_transform_1")

data_transform_0.get_party_instance(
role='guest', party_id=guest).component_param(
with_label=True, output_format="dense", label_name="motor_speed", label_type="float")

data_transform_1.get_party_instance(
role='guest', party_id=guest).component_param(
with_label=False, output_format="dense")

union_0 = Union(name="union_0", axis=1)

pipeline.add_component(reader_0)
pipeline.add_component(reader_1)
pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))
pipeline.add_component(
data_transform_1, data=Data(
data=reader_1.output.data))

pipeline.add_component(union_0, data=Data(data=[data_transform_0.output.data, data_transform_1.output.data]))

pipeline.compile()

pipeline.fit()


if __name__ == "__main__":
parser = argparse.ArgumentParser("PIPELINE DEMO")
parser.add_argument("-config", type=str,
help="config file")
args = parser.parse_args()
if args.config is not None:
main(args.config)
else:
main()
19 changes: 19 additions & 0 deletions examples/pipeline/union/union_testsuite.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@
"table_name": "tag_value_3",
"namespace": "experiment",
"role": "guest_0"
},
{
"file": "examples/data/motor_hetero_guest.csv",
"head": 1,
"partition": 16,
"table_name": "motor_hetero_guest",
"namespace": "experiment",
"role": "guest_0"
},
{
"file": "examples/data/motor_hetero_host.csv",
"head": 1,
"partition": 16,
"table_name": "motor_hetero_host",
"namespace": "experiment",
"role": "guest_0"
}
],
"pipeline_tasks": {
Expand All @@ -53,6 +69,9 @@
},
"union-tag": {
"script": "./pipeline-union-tag-value.py"
},
"union-axis-1": {
"script": "./pipeline-union-axis-1.py"
}
}
}
41 changes: 26 additions & 15 deletions python/fate_client/pipeline/param/feature_imputation_param.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# limitations under the License.
#

from pipeline.param import consts
from pipeline.param.base_param import BaseParam


Expand All @@ -27,38 +28,41 @@ class FeatureImputationParam(BaseParam):
Parameters
----------
default_value : None or single object type or list
default_value : None or single object type
the value to replace missing value.
if None, it will use default value defined in federatedml/feature/imputer.py,
if single object, will fill missing value with this object,
if list, it's length should be the same as input data' feature dimension,
means that if some column happens to have missing values, it will replace it
the value by element in the identical position of this list.
missing_fill_method : [None, 'min', 'max', 'mean', 'designated']
if single object, will fill missing value with this object
col_default_value: None or dict of (column name, default_value) pairs
specifies default value for each column;
any column to be filled with 'designated' but not specified in col_default_value will take default_value
missing_fill_method : [None, 'min', 'max', 'mean', 'designated', 'median', 'mode']
the method to replace missing value
col_missing_fill_method: None or dict of (column name, missing_fill_method) pairs
specifies method to replace missing value for each column;
any column not specified will take missing_fill_method,
if missing_fill_method is None, unspecified column will not be imputed;
missing_impute : None or list
element of list can be any type, or auto generated if value is None, define which values to be consider as missing, default: None
element of list can be any type, or auto generated if value is None, define which values to be considered as missing, default: None
error: float, 0 <= error < 1 default: 0.0001
The error of tolerance for quantile calculation when computing median
multi_mode: string, choose between 'random' and 'raise', default 'random'
if 'random', randomly choose one of the modes as fill value, if 'raise', raise error when there are multiple modes
need_run: bool, default True
need run or not
"""

def __init__(self, default_value=0, missing_fill_method=None, col_missing_fill_method=None,
missing_impute=None, need_run=True):
def __init__(self, default_value=0, col_default_value=None, missing_fill_method=None, col_missing_fill_method=None,
missing_impute=None, need_run=True, error=consts.DEFAULT_RELATIVE_ERROR, multi_mode='random'):
super(FeatureImputationParam, self).__init__()
self.default_value = default_value
self.col_default_value = col_default_value
self.missing_fill_method = missing_fill_method
self.col_missing_fill_method = col_missing_fill_method
self.missing_impute = missing_impute
self.need_run = need_run
self.error = error
self.multi_mode = multi_mode

def check(self):

Expand All @@ -68,16 +72,23 @@ def check(self):

if self.missing_fill_method is not None:
self.missing_fill_method = self.check_and_change_lower(self.missing_fill_method,
['min', 'max', 'mean', 'designated'],
['min', 'max', 'mean',
'designated', 'median', 'mode'],
f"{descr}missing_fill_method ")
if self.col_default_value:
if not isinstance(self.col_default_value, dict):
raise ValueError(f"{descr}col_default_value should be a dict")
for k, v in self.col_default_value.items():
if not isinstance(k, str):
raise ValueError(f"{descr}col_default_value should contain str key(s) only")
if self.col_missing_fill_method:
if not isinstance(self.col_missing_fill_method, dict):
raise ValueError(f"{descr}col_missing_fill_method should be a dict")
for k, v in self.col_missing_fill_method.items():
if not isinstance(k, str):
raise ValueError(f"{descr}col_missing_fill_method should contain str key(s) only")
v = self.check_and_change_lower(v,
['min', 'max', 'mean', 'designated'],
['min', 'max', 'mean', 'designated', 'median', 'mode'],
f"per column method specified in {descr} col_missing_fill_method dict")
self.col_missing_fill_method[k] = v
if self.missing_impute:
Expand Down
9 changes: 8 additions & 1 deletion python/fate_client/pipeline/param/union_param.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,20 @@ class UnionParam(BaseParam):
keep_duplicate: bool, default False
Whether to keep entries with duplicated keys. If set to True, a new id will be generated for duplicated entry in the format {id}_{table_name}.
axis: int, default 0
The axis to union tables. 0 for row-wise union and 1 for column-wise union.
unmatched_id: str, choose from {'raise', 'ignore'}, default 'raise'
Whether to ignore unmatched id in the union process for axis=1. If set to 'ignore', the result will only contain entries with id that exists in all tables.
Otherwise, if unmatched id exists, an error will be raised.
"""

def __init__(self, need_run=True, allow_missing=False, keep_duplicate=False):
def __init__(self, need_run=True, allow_missing=False, keep_duplicate=False, axis=0, unmatched_id='raise'):
super().__init__()
self.need_run = need_run
self.allow_missing = allow_missing
self.keep_duplicate = keep_duplicate
self.axis = axis
self.unmatched_id = unmatched_id

def check(self):
descr = "union param's "
Expand Down
Loading

0 comments on commit 7b4374e

Please sign in to comment.