From c58ef74c18554d823f7957bf602184c744bb7ed7 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 24 Mar 2024 13:24:23 -0400 Subject: [PATCH] feat: Updating protos to separate transformation (#4018) * feat: updating protos to separate transformation Signed-off-by: Francisco Javier Arceo * fixed stuff...i think Signed-off-by: Francisco Javier Arceo * updated tests and registry diff function Signed-off-by: Francisco Javier Arceo * updated base registry Signed-off-by: Francisco Javier Arceo * updated react component Signed-off-by: Francisco Javier Arceo * formatted Signed-off-by: Francisco Javier Arceo * updated stream feature view proto Signed-off-by: Francisco Javier Arceo * making the proto changes backwards compatable Signed-off-by: Francisco Javier Arceo * trying to make this backwards compatible Signed-off-by: Francisco Javier Arceo * caught a bug and fixed the linter * actually linted Signed-off-by: Francisco Javier Arceo * updated ui component Signed-off-by: Francisco Javier Arceo * accidentally commented out fixtures * Updated Signed-off-by: Francisco Javier Arceo * incrementing protos * updated tests Signed-off-by: Francisco Javier Arceo * fixed linting issue and made backwards compatible Signed-off-by: Francisco Javier Arceo * added more tests Signed-off-by: Francisco Javier Arceo --------- Signed-off-by: Francisco Javier Arceo --- protos/feast/core/OnDemandFeatureView.proto | 14 +- protos/feast/core/StreamFeatureView.proto | 7 +- protos/feast/core/Transformation.proto | 33 +++ sdk/python/feast/diff/registry_diff.py | 21 +- .../feast/infra/registry/base_registry.py | 14 +- sdk/python/feast/on_demand_feature_view.py | 54 +++- .../feast/on_demand_pandas_transformation.py | 4 +- .../on_demand_substrait_transformation.py | 4 +- sdk/python/feast/stream_feature_view.py | 33 ++- .../tests/unit/diff/test_registry_diff.py | 5 +- sdk/python/tests/unit/test_feature_views.py | 205 +------------- .../tests/unit/test_on_demand_feature_view.py | 66 +++++ .../tests/unit/test_stream_feature_view.py | 252 ++++++++++++++++++ .../OnDemandFeatureViewOverviewTab.tsx | 2 +- 14 files changed, 482 insertions(+), 232 deletions(-) create mode 100644 protos/feast/core/Transformation.proto create mode 100644 sdk/python/tests/unit/test_stream_feature_view.py diff --git a/protos/feast/core/OnDemandFeatureView.proto b/protos/feast/core/OnDemandFeatureView.proto index c43b33c1d2..cd3ceba150 100644 --- a/protos/feast/core/OnDemandFeatureView.proto +++ b/protos/feast/core/OnDemandFeatureView.proto @@ -27,6 +27,7 @@ import "feast/core/FeatureView.proto"; import "feast/core/FeatureViewProjection.proto"; import "feast/core/Feature.proto"; import "feast/core/DataSource.proto"; +import "feast/core/Transformation.proto"; message OnDemandFeatureView { // User-specified specifications of this feature view. @@ -49,9 +50,11 @@ message OnDemandFeatureViewSpec { map sources = 4; oneof transformation { - UserDefinedFunction user_defined_function = 5; - OnDemandSubstraitTransformation on_demand_substrait_transformation = 9; + UserDefinedFunction user_defined_function = 5 [deprecated = true]; + OnDemandSubstraitTransformation on_demand_substrait_transformation = 9 [deprecated = true]; } + // Oneof with {user_defined_function, on_demand_substrait_transformation} + FeatureTransformationV2 feature_transformation = 10; // Description of the on demand feature view. string description = 6; @@ -61,6 +64,7 @@ message OnDemandFeatureViewSpec { // Owner of the on demand feature view. string owner = 8; + string mode = 11; } message OnDemandFeatureViewMeta { @@ -81,6 +85,8 @@ message OnDemandSource { // Serialized representation of python function. message UserDefinedFunction { + option deprecated = true; + // The function name string name = 1; @@ -92,5 +98,7 @@ message UserDefinedFunction { } message OnDemandSubstraitTransformation { + option deprecated = true; + bytes substrait_plan = 1; -} \ No newline at end of file +} diff --git a/protos/feast/core/StreamFeatureView.proto b/protos/feast/core/StreamFeatureView.proto index 3181bdf360..cb7da0faf3 100644 --- a/protos/feast/core/StreamFeatureView.proto +++ b/protos/feast/core/StreamFeatureView.proto @@ -29,6 +29,7 @@ import "feast/core/FeatureView.proto"; import "feast/core/Feature.proto"; import "feast/core/DataSource.proto"; import "feast/core/Aggregation.proto"; +import "feast/core/Transformation.proto"; message StreamFeatureView { // User-specified specifications of this feature view. @@ -77,7 +78,8 @@ message StreamFeatureViewSpec { bool online = 12; // Serialized function that is encoded in the streamfeatureview - UserDefinedFunction user_defined_function = 13; + UserDefinedFunction user_defined_function = 13 [deprecated = true]; + // Mode of execution string mode = 14; @@ -87,5 +89,8 @@ message StreamFeatureViewSpec { // Timestamp field for aggregation string timestamp_field = 16; + + // Oneof with {user_defined_function, on_demand_substrait_transformation} + FeatureTransformationV2 feature_transformation = 17; } diff --git a/protos/feast/core/Transformation.proto b/protos/feast/core/Transformation.proto new file mode 100644 index 0000000000..cde2833fa4 --- /dev/null +++ b/protos/feast/core/Transformation.proto @@ -0,0 +1,33 @@ +syntax = "proto3"; +package feast.core; + +option go_package = "github.com/feast-dev/feast/go/protos/feast/core"; +option java_outer_classname = "FeatureTransformationProto"; +option java_package = "feast.proto.core"; + +import "google/protobuf/duration.proto"; + +// Serialized representation of python function. +message UserDefinedFunctionV2 { + // The function name + string name = 1; + + // The python-syntax function body (serialized by dill) + bytes body = 2; + + // The string representation of the udf + string body_text = 3; +} + +// A feature transformation executed as a user-defined function +message FeatureTransformationV2 { + // Note this Transformation starts at 5 for backwards compatibility + oneof transformation { + UserDefinedFunctionV2 user_defined_function = 1; + OnDemandSubstraitTransformationV2 on_demand_substrait_transformation = 2; + } +} + +message OnDemandSubstraitTransformationV2 { + bytes substrait_plan = 1; +} diff --git a/sdk/python/feast/diff/registry_diff.py b/sdk/python/feast/diff/registry_diff.py index 15f880e392..120f5d697a 100644 --- a/sdk/python/feast/diff/registry_diff.py +++ b/sdk/python/feast/diff/registry_diff.py @@ -1,3 +1,4 @@ +import warnings from dataclasses import dataclass from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, TypeVar, cast @@ -144,11 +145,25 @@ def diff_registry_objects( if _field.name in FIELDS_TO_IGNORE: continue elif getattr(current_spec, _field.name) != getattr(new_spec, _field.name): - if _field.name == "user_defined_function": + # TODO: Delete "transformation" after we've safely deprecated it from the proto + if _field.name in ["transformation", "feature_transformation"]: + warnings.warn( + "transformation will be deprecated in the future please use feature_transformation instead.", + DeprecationWarning, + ) current_spec = cast(OnDemandFeatureViewSpec, current_spec) new_spec = cast(OnDemandFeatureViewSpec, new_spec) - current_udf = current_spec.user_defined_function - new_udf = new_spec.user_defined_function + # Check if the old proto is populated and use that if it is + deprecated_udf = current_spec.user_defined_function + feature_transformation_udf = ( + current_spec.feature_transformation.user_defined_function + ) + current_udf = ( + deprecated_udf + if deprecated_udf.body_text != "" + else feature_transformation_udf + ) + new_udf = new_spec.feature_transformation.user_defined_function for _udf_field in current_udf.DESCRIPTOR.fields: if _udf_field.name == "body": continue diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index 9ee3bbbabc..d3d82a80b0 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import json +import warnings from abc import ABC, abstractmethod from collections import defaultdict from datetime import datetime @@ -662,10 +663,16 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: key=lambda on_demand_feature_view: on_demand_feature_view.name, ): odfv_dict = self._message_to_sorted_dict(on_demand_feature_view.to_proto()) - - odfv_dict["spec"]["userDefinedFunction"][ + # We are logging a warning because the registry object may be read from a proto that is not updated + # i.e., we have to submit dual writes but in order to ensure the read behavior succeeds we have to load + # both objects to compare any changes in the registry + warnings.warn( + "We will be deprecating the usage of spec.userDefinedFunction in a future release please upgrade cautiously.", + DeprecationWarning, + ) + odfv_dict["spec"]["featureTransformation"]["userDefinedFunction"][ "body" - ] = on_demand_feature_view.transformation.udf_string + ] = on_demand_feature_view.feature_transformation.udf_string registry_dict["onDemandFeatureViews"].append(odfv_dict) for request_feature_view in sorted( self.list_request_feature_views(project=project), @@ -684,6 +691,7 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: "body" ] = stream_feature_view.udf_string registry_dict["streamFeatureViews"].append(sfv_dict) + for saved_dataset in sorted( self.list_saved_datasets(project=project), key=lambda item: item.name ): diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 586286a3d4..61e55bb0c0 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -27,6 +27,12 @@ OnDemandFeatureViewSpec, OnDemandSource, ) +from feast.protos.feast.core.Transformation_pb2 import ( + FeatureTransformationV2 as FeatureTransformationProto, +) +from feast.protos.feast.core.Transformation_pb2 import ( + UserDefinedFunctionV2 as UserDefinedFunctionProto, +) from feast.type_map import ( feast_value_type_to_pandas_type, python_type_to_feast_value_type, @@ -63,6 +69,7 @@ class OnDemandFeatureView(BaseFeatureView): source_feature_view_projections: Dict[str, FeatureViewProjection] source_request_sources: Dict[str, RequestSource] transformation: Union[OnDemandPandasTransformation] + feature_transformation: Union[OnDemandPandasTransformation] description: str tags: Dict[str, str] owner: str @@ -83,6 +90,7 @@ def __init__( # noqa: C901 udf: Optional[FunctionType] = None, udf_string: str = "", transformation: Optional[Union[OnDemandPandasTransformation]] = None, + feature_transformation: Optional[Union[OnDemandPandasTransformation]] = None, description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", @@ -101,6 +109,7 @@ def __init__( # noqa: C901 dataframes as inputs. udf_string (deprecated): The source code version of the udf (for diffing and displaying in Web UI) transformation: The user defined transformation. + feature_transformation: The user defined transformation. description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the on demand feature view, typically the email @@ -139,6 +148,7 @@ def __init__( # noqa: C901 ] = odfv_source.projection self.transformation = transformation + self.feature_transformation = self.transformation @property def proto_class(self) -> Type[OnDemandFeatureViewProto]: @@ -151,6 +161,7 @@ def __copy__(self): sources=list(self.source_feature_view_projections.values()) + list(self.source_request_sources.values()), transformation=self.transformation, + feature_transformation=self.transformation, description=self.description, tags=self.tags, owner=self.owner, @@ -172,6 +183,7 @@ def __eq__(self, other): != other.source_feature_view_projections or self.source_request_sources != other.source_request_sources or self.transformation != other.transformation + or self.feature_transformation != other.feature_transformation ): return False @@ -205,16 +217,19 @@ def to_proto(self) -> OnDemandFeatureViewProto: request_data_source=request_sources.to_proto() ) - spec = OnDemandFeatureViewSpec( - name=self.name, - features=[feature.to_proto() for feature in self.features], - sources=sources, + feature_transformation = FeatureTransformationProto( user_defined_function=self.transformation.to_proto() if type(self.transformation) == OnDemandPandasTransformation else None, - on_demand_substrait_transformation=self.transformation.to_proto() # type: ignore + on_demand_substrait_transformation=self.transformation.to_proto() if type(self.transformation) == OnDemandSubstraitTransformation - else None, + else None, # type: ignore + ) + spec = OnDemandFeatureViewSpec( + name=self.name, + features=[feature.to_proto() for feature in self.features], + sources=sources, + feature_transformation=feature_transformation, description=self.description, tags=self.tags, owner=self.owner, @@ -254,18 +269,37 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): ) if ( - on_demand_feature_view_proto.spec.WhichOneof("transformation") + on_demand_feature_view_proto.spec.feature_transformation.WhichOneof( + "transformation" + ) == "user_defined_function" + and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text + != "" ): transformation = OnDemandPandasTransformation.from_proto( - on_demand_feature_view_proto.spec.user_defined_function + on_demand_feature_view_proto.spec.feature_transformation.user_defined_function ) elif ( - on_demand_feature_view_proto.spec.WhichOneof("transformation") + on_demand_feature_view_proto.spec.feature_transformation.WhichOneof( + "transformation" + ) == "on_demand_substrait_transformation" ): transformation = OnDemandSubstraitTransformation.from_proto( - on_demand_feature_view_proto.spec.on_demand_substrait_transformation + on_demand_feature_view_proto.spec.feature_transformation.on_demand_substrait_transformation + ) + elif ( + hasattr(on_demand_feature_view_proto.spec, "user_defined_function") + and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text + == "" + ): + backwards_compatible_udf = UserDefinedFunctionProto( + name=on_demand_feature_view_proto.spec.user_defined_function.name, + body=on_demand_feature_view_proto.spec.user_defined_function.body, + body_text=on_demand_feature_view_proto.spec.user_defined_function.body_text, + ) + transformation = OnDemandPandasTransformation.from_proto( + user_defined_function_proto=backwards_compatible_udf, ) else: raise Exception("At least one transformation type needs to be provided") diff --git a/sdk/python/feast/on_demand_pandas_transformation.py b/sdk/python/feast/on_demand_pandas_transformation.py index 32cb44b429..48f5263051 100644 --- a/sdk/python/feast/on_demand_pandas_transformation.py +++ b/sdk/python/feast/on_demand_pandas_transformation.py @@ -3,8 +3,8 @@ import dill import pandas as pd -from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( - UserDefinedFunction as UserDefinedFunctionProto, +from feast.protos.feast.core.Transformation_pb2 import ( + UserDefinedFunctionV2 as UserDefinedFunctionProto, ) diff --git a/sdk/python/feast/on_demand_substrait_transformation.py b/sdk/python/feast/on_demand_substrait_transformation.py index 4e92e77dc8..0666739125 100644 --- a/sdk/python/feast/on_demand_substrait_transformation.py +++ b/sdk/python/feast/on_demand_substrait_transformation.py @@ -2,8 +2,8 @@ import pyarrow import pyarrow.substrait as substrait # type: ignore # noqa -from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( - OnDemandSubstraitTransformation as OnDemandSubstraitTransformationProto, +from feast.protos.feast.core.Transformation_pb2 import ( + OnDemandSubstraitTransformationV2 as OnDemandSubstraitTransformationProto, ) diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index 13abbc5e28..e8741a75fe 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -15,6 +15,7 @@ from feast.entity import Entity from feast.feature_view import FeatureView from feast.field import Field +from feast.on_demand_pandas_transformation import OnDemandPandasTransformation from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( UserDefinedFunction as UserDefinedFunctionProto, @@ -25,6 +26,12 @@ from feast.protos.feast.core.StreamFeatureView_pb2 import ( StreamFeatureViewSpec as StreamFeatureViewSpecProto, ) +from feast.protos.feast.core.Transformation_pb2 import ( + FeatureTransformationV2 as FeatureTransformationProto, +) +from feast.protos.feast.core.Transformation_pb2 import ( + UserDefinedFunctionV2 as UserDefinedFunctionProtoV2, +) warnings.simplefilter("once", RuntimeWarning) @@ -73,6 +80,7 @@ class StreamFeatureView(FeatureView): materialization_intervals: List[Tuple[datetime, datetime]] udf: Optional[FunctionType] udf_string: Optional[str] + feature_transformation: Optional[OnDemandPandasTransformation] def __init__( self, @@ -91,6 +99,7 @@ def __init__( timestamp_field: Optional[str] = "", udf: Optional[FunctionType] = None, udf_string: Optional[str] = "", + feature_transformation: Optional[Union[OnDemandPandasTransformation]] = None, ): if not flags_helper.is_test(): warnings.warn( @@ -118,6 +127,7 @@ def __init__( self.timestamp_field = timestamp_field or "" self.udf = udf self.udf_string = udf_string + self.feature_transformation = feature_transformation super().__init__( name=name, @@ -171,19 +181,30 @@ def to_proto(self): stream_source_proto = self.stream_source.to_proto() stream_source_proto.data_source_class_type = f"{self.stream_source.__class__.__module__}.{self.stream_source.__class__.__name__}" - udf_proto = None + udf_proto, feature_transformation = None, None if self.udf: udf_proto = UserDefinedFunctionProto( name=self.udf.__name__, body=dill.dumps(self.udf, recurse=True), body_text=self.udf_string, ) + udf_proto_v2 = UserDefinedFunctionProtoV2( + name=self.udf.__name__, + body=dill.dumps(self.udf, recurse=True), + body_text=self.udf_string, + ) + + feature_transformation = FeatureTransformationProto( + user_defined_function=udf_proto_v2, + ) + spec = StreamFeatureViewSpecProto( name=self.name, entities=self.entities, entity_columns=[field.to_proto() for field in self.entity_columns], features=[field.to_proto() for field in self.schema], user_defined_function=udf_proto, + feature_transformation=feature_transformation, description=self.description, tags=self.tags, owner=self.owner, @@ -220,6 +241,11 @@ def from_proto(cls, sfv_proto): if sfv_proto.spec.HasField("user_defined_function") else None ) + feature_transformation = ( + sfv_proto.spec.feature_transformation.user_defined_function.body_text + if sfv_proto.spec.HasField("feature_transformation") + else None + ) stream_feature_view = cls( name=sfv_proto.spec.name, description=sfv_proto.spec.description, @@ -238,6 +264,7 @@ def from_proto(cls, sfv_proto): mode=sfv_proto.spec.mode, udf=udf, udf_string=udf_string, + feature_transformation=feature_transformation, aggregations=[ Aggregation.from_proto(agg_proto) for agg_proto in sfv_proto.spec.aggregations @@ -294,6 +321,7 @@ def __copy__(self): timestamp_field=self.timestamp_field, source=self.stream_source if self.stream_source else self.batch_source, udf=self.udf, + feature_transformation=self.feature_transformation, ) fv.entities = self.entities fv.features = copy.copy(self.features) @@ -343,6 +371,9 @@ def decorator(user_function): schema=schema, udf=user_function, udf_string=udf_string, + feature_transformation=OnDemandPandasTransformation( + user_function, udf_string + ), description=description, tags=tags, online=online, diff --git a/sdk/python/tests/unit/diff/test_registry_diff.py b/sdk/python/tests/unit/diff/test_registry_diff.py index ce40295f8b..c209f1e0e0 100644 --- a/sdk/python/tests/unit/diff/test_registry_diff.py +++ b/sdk/python/tests/unit/diff/test_registry_diff.py @@ -137,13 +137,14 @@ def post_changed(inputs: pd.DataFrame) -> pd.DataFrame: # if no code is changed assert len(feast_object_diffs.feast_object_property_diffs) == 3 assert feast_object_diffs.feast_object_property_diffs[0].property_name == "name" + # Note we should only now be looking at changes for the feature_transformation field assert ( feast_object_diffs.feast_object_property_diffs[1].property_name - == "user_defined_function.name" + == "feature_transformation.name" ) assert ( feast_object_diffs.feast_object_property_diffs[2].property_name - == "user_defined_function.body_text" + == "feature_transformation.body_text" ) diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index 2ad9680703..0220d1a8a9 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -1,22 +1,16 @@ -import copy from datetime import timedelta import pytest from typeguard import TypeCheckError -from feast.aggregation import Aggregation from feast.batch_feature_view import BatchFeatureView from feast.data_format import AvroFormat -from feast.data_source import KafkaSource, PushSource +from feast.data_source import KafkaSource from feast.entity import Entity from feast.feature_view import FeatureView from feast.field import Field from feast.infra.offline_stores.file_source import FileSource -from feast.protos.feast.core.StreamFeatureView_pb2 import ( - StreamFeatureView as StreamFeatureViewProto, -) from feast.protos.feast.types.Value_pb2 import ValueType -from feast.stream_feature_view import StreamFeatureView, stream_feature_view from feast.types import Float32 @@ -65,169 +59,10 @@ def test_create_batch_feature_view(): ) -def test_create_stream_feature_view(): - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - StreamFeatureView( - name="test kafka stream feature view", - entities=[], - ttl=timedelta(days=30), - source=stream_source, - aggregations=[], - ) - - push_source = PushSource( - name="push source", batch_source=FileSource(path="some path") - ) - StreamFeatureView( - name="test push source feature view", - entities=[], - ttl=timedelta(days=30), - source=push_source, - aggregations=[], - ) - - with pytest.raises(TypeError): - StreamFeatureView( - name="test batch feature view", - entities=[], - ttl=timedelta(days=30), - aggregations=[], - ) - - with pytest.raises(ValueError): - StreamFeatureView( - name="test batch feature view", - entities=[], - ttl=timedelta(days=30), - source=FileSource(path="some path"), - aggregations=[], - ) - - def simple_udf(x: int): return x + 3 -def test_stream_feature_view_serialization(): - entity = Entity(name="driver_entity", join_keys=["test_key"]) - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - - sfv = StreamFeatureView( - name="test kafka stream feature view", - entities=[entity], - ttl=timedelta(days=30), - owner="test@example.com", - online=True, - schema=[Field(name="dummy_field", dtype=Float32)], - description="desc", - aggregations=[ - Aggregation( - column="dummy_field", - function="max", - time_window=timedelta(days=1), - ) - ], - timestamp_field="event_timestamp", - mode="spark", - source=stream_source, - udf=simple_udf, - tags={}, - ) - - sfv_proto = sfv.to_proto() - - new_sfv = StreamFeatureView.from_proto(sfv_proto=sfv_proto) - assert new_sfv == sfv - - -def test_stream_feature_view_udfs(): - entity = Entity(name="driver_entity", join_keys=["test_key"]) - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - - @stream_feature_view( - entities=[entity], - ttl=timedelta(days=30), - owner="test@example.com", - online=True, - schema=[Field(name="dummy_field", dtype=Float32)], - description="desc", - aggregations=[ - Aggregation( - column="dummy_field", - function="max", - time_window=timedelta(days=1), - ) - ], - timestamp_field="event_timestamp", - source=stream_source, - ) - def pandas_udf(pandas_df): - import pandas as pd - - assert type(pandas_df) == pd.DataFrame - df = pandas_df.transform(lambda x: x + 10, axis=1) - return df - - import pandas as pd - - df = pd.DataFrame({"A": [1, 2, 3], "B": [10, 20, 30]}) - sfv = pandas_udf - sfv_proto = sfv.to_proto() - new_sfv = StreamFeatureView.from_proto(sfv_proto) - new_df = new_sfv.udf(df) - - expected_df = pd.DataFrame({"A": [11, 12, 13], "B": [20, 30, 40]}) - - assert new_df.equals(expected_df) - - -def test_stream_feature_view_initialization_with_optional_fields_omitted(): - entity = Entity(name="driver_entity", join_keys=["test_key"]) - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - - sfv = StreamFeatureView( - name="test kafka stream feature view", - entities=[entity], - schema=[], - description="desc", - timestamp_field="event_timestamp", - source=stream_source, - tags={}, - ) - sfv_proto = sfv.to_proto() - - new_sfv = StreamFeatureView.from_proto(sfv_proto=sfv_proto) - assert new_sfv == sfv - - def test_hash(): file_source = FileSource(name="my-file-source", path="test.parquet") feature_view_1 = FeatureView( @@ -282,41 +117,3 @@ def test_hash(): def test_field_types(): with pytest.raises(TypeCheckError): Field(name="name", dtype=ValueType.INT32) - - -def test_stream_feature_view_proto_type(): - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - sfv = StreamFeatureView( - name="test stream featureview proto class", - entities=[], - ttl=timedelta(days=30), - source=stream_source, - aggregations=[], - ) - assert sfv.proto_class is StreamFeatureViewProto - - -def test_stream_feature_view_copy(): - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - sfv = StreamFeatureView( - name="test stream featureview proto class", - entities=[], - ttl=timedelta(days=30), - source=stream_source, - aggregations=[], - ) - assert sfv == copy.copy(sfv) diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index 66d02c65d1..b83449519f 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -129,3 +129,69 @@ def test_hash(): assert on_demand_feature_view_5.transformation == OnDemandPandasTransformation( udf2, "udf2 source code" ) + assert ( + on_demand_feature_view_5.feature_transformation + == on_demand_feature_view_5.transformation + ) + + +@pytest.mark.filterwarnings("ignore:udf and udf_string parameters are deprecated") +def test_from_proto_backwards_compatable_udf(): + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="my-feature-view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + sources = [feature_view] + on_demand_feature_view = OnDemandFeatureView( + name="my-on-demand-feature-view", + sources=sources, + schema=[ + Field(name="output1", dtype=Float32), + Field(name="output2", dtype=Float32), + ], + transformation=OnDemandPandasTransformation( + udf=udf1, udf_string="udf1 source code" + ), + ) + + # We need a proto with the "udf1 source code" in the user_defined_function.body_text + # and to populate it in feature_transformation + proto = on_demand_feature_view.to_proto() + assert ( + on_demand_feature_view.transformation.udf_string + == proto.spec.feature_transformation.user_defined_function.body_text + ) + # Because of the current set of code this is just confirming it is empty + assert proto.spec.user_defined_function.body_text == "" + assert proto.spec.user_defined_function.body == b"" + assert proto.spec.user_defined_function.name == "" + + # Assuming we pull it from the registry we set it to the feature_transformation proto values + proto.spec.user_defined_function.name = ( + proto.spec.feature_transformation.user_defined_function.name + ) + proto.spec.user_defined_function.body = ( + proto.spec.feature_transformation.user_defined_function.body + ) + proto.spec.user_defined_function.body_text = ( + proto.spec.feature_transformation.user_defined_function.body_text + ) + + # And now we're going to null the feature_transformation proto object before reserializing the entire proto + # proto.spec.user_defined_function.body_text = on_demand_feature_view.transformation.udf_string + proto.spec.feature_transformation.user_defined_function.name = "" + proto.spec.feature_transformation.user_defined_function.body = b"" + proto.spec.feature_transformation.user_defined_function.body_text = "" + + # And now we expect the to get the same object back under feature_transformation + reserialized_proto = OnDemandFeatureView.from_proto(proto) + assert ( + reserialized_proto.feature_transformation.udf_string + == on_demand_feature_view.feature_transformation.udf_string + ) diff --git a/sdk/python/tests/unit/test_stream_feature_view.py b/sdk/python/tests/unit/test_stream_feature_view.py new file mode 100644 index 0000000000..b53f9a593a --- /dev/null +++ b/sdk/python/tests/unit/test_stream_feature_view.py @@ -0,0 +1,252 @@ +import copy +from datetime import timedelta + +import pytest + +from feast.aggregation import Aggregation +from feast.batch_feature_view import BatchFeatureView +from feast.data_format import AvroFormat +from feast.data_source import KafkaSource, PushSource +from feast.entity import Entity +from feast.field import Field +from feast.infra.offline_stores.file_source import FileSource +from feast.protos.feast.core.StreamFeatureView_pb2 import ( + StreamFeatureView as StreamFeatureViewProto, +) +from feast.stream_feature_view import StreamFeatureView, stream_feature_view +from feast.types import Float32 + + +def test_create_batch_feature_view(): + batch_source = FileSource(path="some path") + BatchFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + source=batch_source, + ) + + with pytest.raises(TypeError): + BatchFeatureView( + name="test batch feature view", entities=[], ttl=timedelta(days=30) + ) + + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + with pytest.raises(ValueError): + BatchFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + ) + + +def test_create_stream_feature_view(): + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + StreamFeatureView( + name="test kafka stream feature view", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + aggregations=[], + ) + + push_source = PushSource( + name="push source", batch_source=FileSource(path="some path") + ) + StreamFeatureView( + name="test push source feature view", + entities=[], + ttl=timedelta(days=30), + source=push_source, + aggregations=[], + ) + + with pytest.raises(TypeError): + StreamFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + aggregations=[], + ) + + with pytest.raises(ValueError): + StreamFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + source=FileSource(path="some path"), + aggregations=[], + ) + + +def simple_udf(x: int): + return x + 3 + + +def test_stream_feature_view_serialization(): + entity = Entity(name="driver_entity", join_keys=["test_key"]) + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + + sfv = StreamFeatureView( + name="test kafka stream feature view", + entities=[entity], + ttl=timedelta(days=30), + owner="test@example.com", + online=True, + schema=[Field(name="dummy_field", dtype=Float32)], + description="desc", + aggregations=[ + Aggregation( + column="dummy_field", + function="max", + time_window=timedelta(days=1), + ) + ], + timestamp_field="event_timestamp", + mode="spark", + source=stream_source, + udf=simple_udf, + tags={}, + ) + + sfv_proto = sfv.to_proto() + + new_sfv = StreamFeatureView.from_proto(sfv_proto=sfv_proto) + assert new_sfv == sfv + assert ( + sfv_proto.spec.feature_transformation.user_defined_function.name == "simple_udf" + ) + + +def test_stream_feature_view_udfs(): + entity = Entity(name="driver_entity", join_keys=["test_key"]) + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + + @stream_feature_view( + entities=[entity], + ttl=timedelta(days=30), + owner="test@example.com", + online=True, + schema=[Field(name="dummy_field", dtype=Float32)], + description="desc", + aggregations=[ + Aggregation( + column="dummy_field", + function="max", + time_window=timedelta(days=1), + ) + ], + timestamp_field="event_timestamp", + source=stream_source, + ) + def pandas_udf(pandas_df): + import pandas as pd + + assert type(pandas_df) == pd.DataFrame + df = pandas_df.transform(lambda x: x + 10, axis=1) + return df + + import pandas as pd + + df = pd.DataFrame({"A": [1, 2, 3], "B": [10, 20, 30]}) + sfv = pandas_udf + sfv_proto = sfv.to_proto() + new_sfv = StreamFeatureView.from_proto(sfv_proto) + new_df = new_sfv.udf(df) + + expected_df = pd.DataFrame({"A": [11, 12, 13], "B": [20, 30, 40]}) + + assert new_df.equals(expected_df) + + +def test_stream_feature_view_initialization_with_optional_fields_omitted(): + entity = Entity(name="driver_entity", join_keys=["test_key"]) + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + + sfv = StreamFeatureView( + name="test kafka stream feature view", + entities=[entity], + schema=[], + description="desc", + timestamp_field="event_timestamp", + source=stream_source, + tags={}, + ) + sfv_proto = sfv.to_proto() + + new_sfv = StreamFeatureView.from_proto(sfv_proto=sfv_proto) + assert new_sfv == sfv + + +def test_stream_feature_view_proto_type(): + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + sfv = StreamFeatureView( + name="test stream featureview proto class", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + aggregations=[], + ) + assert sfv.proto_class is StreamFeatureViewProto + + +def test_stream_feature_view_copy(): + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + sfv = StreamFeatureView( + name="test stream featureview proto class", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + aggregations=[], + ) + assert sfv == copy.copy(sfv) diff --git a/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx b/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx index ee8e41bbf6..aac3f6ac5b 100644 --- a/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx +++ b/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx @@ -57,7 +57,7 @@ const OnDemandFeatureViewOverviewTab = ({ - {data?.spec?.userDefinedFunction?.bodyText} + {data?.spec?.featureTransformation?.userDefinedFunction?.bodyText}