-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Decouple transformation types from ODFVs #3949
Changes from all commits
5a5278d
f0180e6
b3221fb
9b936f1
2f0c67c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
from feast.feature_view import FeatureView | ||
from feast.feature_view_projection import FeatureViewProjection | ||
from feast.field import Field, from_value_type | ||
from feast.on_demand_pandas_transformation import OnDemandPandasTransformation | ||
from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( | ||
OnDemandFeatureView as OnDemandFeatureViewProto, | ||
) | ||
|
@@ -24,9 +25,6 @@ | |
OnDemandFeatureViewSpec, | ||
OnDemandSource, | ||
) | ||
from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( | ||
UserDefinedFunction as UserDefinedFunctionProto, | ||
) | ||
from feast.type_map import ( | ||
feast_value_type_to_pandas_type, | ||
python_type_to_feast_value_type, | ||
|
@@ -51,8 +49,7 @@ class OnDemandFeatureView(BaseFeatureView): | |
sources with type FeatureViewProjection. | ||
source_request_sources: A map from input source names to the actual input | ||
sources with type RequestSource. | ||
udf: The user defined transformation function, which must take pandas dataframes | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we still keep the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, that's a good idea There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool. nit, maybe keep the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's in the comment for the init method. Do you mean the comment that lists the attributes? it's no longer an attribute, so probably shouldn't be there. It's being converted to transformation in init method and discarded. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah that make sense |
||
as inputs. | ||
transformation: The user defined transformation. | ||
description: A human-readable description. | ||
tags: A dictionary of key-value pairs to store arbitrary metadata. | ||
owner: The owner of the on demand feature view, typically the email of the primary | ||
|
@@ -63,8 +60,7 @@ class OnDemandFeatureView(BaseFeatureView): | |
features: List[Field] | ||
source_feature_view_projections: Dict[str, FeatureViewProjection] | ||
source_request_sources: Dict[str, RequestSource] | ||
udf: FunctionType | ||
udf_string: str | ||
transformation: Union[OnDemandPandasTransformation] | ||
description: str | ||
tags: Dict[str, str] | ||
owner: str | ||
|
@@ -82,8 +78,9 @@ def __init__( # noqa: C901 | |
FeatureViewProjection, | ||
] | ||
], | ||
udf: FunctionType, | ||
udf: Optional[FunctionType] = None, | ||
udf_string: str = "", | ||
transformation: Optional[Union[OnDemandPandasTransformation]] = None, | ||
description: str = "", | ||
tags: Optional[Dict[str, str]] = None, | ||
owner: str = "", | ||
|
@@ -98,9 +95,10 @@ def __init__( # noqa: C901 | |
sources: A map from input source names to the actual input sources, which may be | ||
feature views, or request data sources. These sources serve as inputs to the udf, | ||
which will refer to them by name. | ||
udf: The user defined transformation function, which must take pandas | ||
udf (deprecated): The user defined transformation function, which must take pandas | ||
dataframes as inputs. | ||
udf_string: The source code version of the udf (for diffing and displaying in Web UI) | ||
udf_string (deprecated): The source code version of the udf (for diffing and displaying in Web UI) | ||
transformation: The user defined transformation. | ||
description (optional): A human-readable description. | ||
tags (optional): A dictionary of key-value pairs to store arbitrary metadata. | ||
owner (optional): The owner of the on demand feature view, typically the email | ||
|
@@ -114,6 +112,18 @@ def __init__( # noqa: C901 | |
owner=owner, | ||
) | ||
|
||
if not transformation: | ||
if udf: | ||
warnings.warn( | ||
"udf and udf_string parameters are deprecated. Please use transformation=OnDemandPandasTransformation(udf, udf_string) instead.", | ||
DeprecationWarning, | ||
) | ||
transformation = OnDemandPandasTransformation(udf, udf_string) | ||
else: | ||
raise Exception( | ||
"OnDemandFeatureView needs to be initialized with either transformation or udf arguments" | ||
) | ||
|
||
self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {} | ||
self.source_request_sources: Dict[str, RequestSource] = {} | ||
for odfv_source in sources: | ||
|
@@ -126,8 +136,7 @@ def __init__( # noqa: C901 | |
odfv_source.name | ||
] = odfv_source.projection | ||
|
||
self.udf = udf # type: ignore | ||
self.udf_string = udf_string | ||
self.transformation = transformation | ||
|
||
@property | ||
def proto_class(self) -> Type[OnDemandFeatureViewProto]: | ||
|
@@ -139,8 +148,7 @@ def __copy__(self): | |
schema=self.features, | ||
sources=list(self.source_feature_view_projections.values()) | ||
+ list(self.source_request_sources.values()), | ||
udf=self.udf, | ||
udf_string=self.udf_string, | ||
transformation=self.transformation, | ||
description=self.description, | ||
tags=self.tags, | ||
owner=self.owner, | ||
|
@@ -161,8 +169,7 @@ def __eq__(self, other): | |
self.source_feature_view_projections | ||
!= other.source_feature_view_projections | ||
or self.source_request_sources != other.source_request_sources | ||
or self.udf_string != other.udf_string | ||
or self.udf.__code__.co_code != other.udf.__code__.co_code | ||
or self.transformation != other.transformation | ||
): | ||
return False | ||
|
||
|
@@ -200,11 +207,9 @@ def to_proto(self) -> OnDemandFeatureViewProto: | |
name=self.name, | ||
features=[feature.to_proto() for feature in self.features], | ||
sources=sources, | ||
user_defined_function=UserDefinedFunctionProto( | ||
name=self.udf.__name__, | ||
body=dill.dumps(self.udf, recurse=True), | ||
body_text=self.udf_string, | ||
), | ||
user_defined_function=self.transformation.to_proto() | ||
if type(self.transformation) == OnDemandPandasTransformation | ||
else None, | ||
description=self.description, | ||
tags=self.tags, | ||
owner=self.owner, | ||
|
@@ -243,6 +248,16 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): | |
RequestSource.from_proto(on_demand_source.request_data_source) | ||
) | ||
|
||
if ( | ||
on_demand_feature_view_proto.spec.WhichOneof("transformation") | ||
== "user_defined_function" | ||
): | ||
transformation = OnDemandPandasTransformation.from_proto( | ||
on_demand_feature_view_proto.spec.user_defined_function | ||
) | ||
else: | ||
raise Exception("At least one transformation type needs to be provided") | ||
|
||
on_demand_feature_view_obj = cls( | ||
name=on_demand_feature_view_proto.spec.name, | ||
schema=[ | ||
|
@@ -253,10 +268,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): | |
for feature in on_demand_feature_view_proto.spec.features | ||
], | ||
sources=sources, | ||
udf=dill.loads( | ||
on_demand_feature_view_proto.spec.user_defined_function.body | ||
), | ||
udf_string=on_demand_feature_view_proto.spec.user_defined_function.body_text, | ||
transformation=transformation, | ||
description=on_demand_feature_view_proto.spec.description, | ||
tags=dict(on_demand_feature_view_proto.spec.tags), | ||
owner=on_demand_feature_view_proto.spec.owner, | ||
|
@@ -315,7 +327,8 @@ def get_transformed_features_df( | |
columns_to_cleanup.append(full_feature_ref) | ||
|
||
# Compute transformed values and apply to each result row | ||
df_with_transformed_features = self.udf.__call__(df_with_features) | ||
|
||
df_with_transformed_features = self.transformation.transform(df_with_features) | ||
|
||
# Work out whether the correct columns names are used. | ||
rename_columns: Dict[str, str] = {} | ||
|
@@ -335,7 +348,7 @@ def get_transformed_features_df( | |
df_with_features.drop(columns=columns_to_cleanup, inplace=True) | ||
return df_with_transformed_features.rename(columns=rename_columns) | ||
|
||
def infer_features(self): | ||
def infer_features(self) -> None: | ||
""" | ||
Infers the set of features associated to this feature view from the input source. | ||
|
||
|
@@ -365,7 +378,7 @@ def infer_features(self): | |
dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type()) | ||
sample_val = rand_df_value[dtype] if dtype in rand_df_value else None | ||
df[f"{field.name}"] = pd.Series(sample_val, dtype=dtype) | ||
output_df: pd.DataFrame = self.udf.__call__(df) | ||
output_df: pd.DataFrame = self.transformation.transform(df) | ||
inferred_features = [] | ||
for f, dt in zip(output_df.columns, output_df.dtypes): | ||
inferred_features.append( | ||
|
@@ -396,7 +409,9 @@ def infer_features(self): | |
) | ||
|
||
@staticmethod | ||
def get_requested_odfvs(feature_refs, project, registry): | ||
def get_requested_odfvs( | ||
feature_refs, project, registry | ||
) -> List["OnDemandFeatureView"]: | ||
all_on_demand_feature_views = registry.list_on_demand_feature_views( | ||
project, allow_cache=True | ||
) | ||
|
@@ -438,7 +453,7 @@ def on_demand_feature_view( | |
of the primary maintainer. | ||
""" | ||
|
||
def mainify(obj): | ||
def mainify(obj) -> None: | ||
# Needed to allow dill to properly serialize the udf. Otherwise, clients will need to have a file with the same | ||
# name as the original file defining the ODFV. | ||
if obj.__module__ != "__main__": | ||
|
@@ -447,15 +462,17 @@ def mainify(obj): | |
def decorator(user_function): | ||
udf_string = dill.source.getsource(user_function) | ||
mainify(user_function) | ||
|
||
transformation = OnDemandPandasTransformation(user_function, udf_string) | ||
|
||
on_demand_feature_view_obj = OnDemandFeatureView( | ||
name=user_function.__name__, | ||
sources=sources, | ||
schema=schema, | ||
udf=user_function, | ||
transformation=transformation, | ||
description=description, | ||
tags=tags, | ||
owner=owner, | ||
udf_string=udf_string, | ||
) | ||
functools.update_wrapper( | ||
wrapper=on_demand_feature_view_obj, wrapped=user_function | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
from types import FunctionType | ||
|
||
import dill | ||
import pandas as pd | ||
|
||
from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( | ||
UserDefinedFunction as UserDefinedFunctionProto, | ||
) | ||
|
||
|
||
class OnDemandPandasTransformation: | ||
def __init__(self, udf: FunctionType, udf_string: str = ""): | ||
""" | ||
Creates an OnDemandPandasTransformation object. | ||
|
||
Args: | ||
udf: The user defined transformation function, which must take pandas | ||
dataframes as inputs. | ||
udf_string: The source code version of the udf (for diffing and displaying in Web UI) | ||
""" | ||
self.udf = udf | ||
self.udf_string = udf_string | ||
|
||
def transform(self, df: pd.DataFrame) -> pd.DataFrame: | ||
return self.udf.__call__(df) | ||
|
||
def __eq__(self, other): | ||
if not isinstance(other, OnDemandPandasTransformation): | ||
raise TypeError( | ||
"Comparisons should only involve OnDemandPandasTransformation class objects." | ||
) | ||
|
||
if not super().__eq__(other): | ||
return False | ||
|
||
if ( | ||
self.udf_string != other.udf_string | ||
or self.udf.__code__.co_code != other.udf.__code__.co_code | ||
): | ||
return False | ||
|
||
return True | ||
|
||
def to_proto(self) -> UserDefinedFunctionProto: | ||
return UserDefinedFunctionProto( | ||
name=self.udf.__name__, | ||
body=dill.dumps(self.udf, recurse=True), | ||
body_text=self.udf_string, | ||
) | ||
|
||
@classmethod | ||
def from_proto(cls, user_defined_function_proto: UserDefinedFunctionProto): | ||
return OnDemandPandasTransformation( | ||
udf=dill.loads(user_defined_function_proto.body), | ||
udf_string=user_defined_function_proto.body_text, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't be backwards compatible right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty sure it is. The binary format will be the same and there would have been no way for multiple fields to have been set with previous version, as it's just a single field. Googled a bit just now.. the very last sentence in this blog post seems to also indicate this should be fine.