Skip to content

Commit

Permalink
feat: Adding write capability to online store to on demand feature vi…
Browse files Browse the repository at this point in the history
…ews (#4585)

* merged changes

Signed-off-by: Francisco Javier Arceo <[email protected]>

* saving progress

Signed-off-by: Francisco Javier Arceo <[email protected]>

* merged changes to odfv

Signed-off-by: Francisco Javier Arceo <[email protected]>

* linted

Signed-off-by: Francisco Javier Arceo <[email protected]>

* adding the test needed to show the expected behavior

Signed-off-by: Francisco Javier Arceo <[email protected]>

* updated test case

Signed-off-by: Francisco Javier Arceo <[email protected]>

* saving progress

Signed-off-by: Francisco Javier Arceo <[email protected]>

* merging

Signed-off-by: Francisco Javier Arceo <[email protected]>

* merged

Signed-off-by: Francisco Javier Arceo <[email protected]>

* merged

Signed-off-by: Francisco Javier Arceo <[email protected]>

* merging

Signed-off-by: Francisco Javier Arceo <[email protected]>

* adding the entity keys for now to do retrieval

Signed-off-by: Francisco Javier Arceo <[email protected]>

* adding entity to odfv

Signed-off-by: Francisco Javier Arceo <[email protected]>

* checking in progress...getting closer

Signed-off-by: Francisco Javier Arceo <[email protected]>

* may have to revert some of this...looks like the challenge is getting the entities correct when storing writes. just checking in progress

Signed-off-by: Francisco Javier Arceo <[email protected]>

* moving things around to make it easier to debug

Signed-off-by: Francisco Javier Arceo <[email protected]>

* debugging

Signed-off-by: Francisco Javier Arceo <[email protected]>

* merged

Signed-off-by: Francisco Javier Arceo <[email protected]>

* merging

Signed-off-by: Francisco Javier Arceo <[email protected]>

* Rebasing and merging changes from other PR

Signed-off-by: Francisco Javier Arceo <[email protected]>

* Merging changes continued

Signed-off-by: Francisco Javier Arceo <[email protected]>

* update the _make_inference to include odfvs with writes in the update map

Signed-off-by: Francisco Javier Arceo <[email protected]>

* have the table being written now...the create table happens in the SqliteOnlineStore.update() method

Signed-off-by: Francisco Javier Arceo <[email protected]>

* checking in progress

Signed-off-by: Francisco Javier Arceo <[email protected]>

* adding logs

Signed-off-by: Francisco Javier Arceo <[email protected]>

* updating permissions

Signed-off-by: Francisco Javier Arceo <[email protected]>

* going to error out on purpose

Signed-off-by: Francisco Javier Arceo <[email protected]>

* adding unit test and merging changes

Signed-off-by: Francisco Javier Arceo <[email protected]>

* almost got everything working and type validation behaving

Signed-off-by: Francisco Javier Arceo <[email protected]>

* cleaned up and have tests behaving

Signed-off-by: Francisco Javier Arceo <[email protected]>

* adding print

Signed-off-by: Francisco Javier Arceo <[email protected]>

* removing print

Signed-off-by: Francisco Javier Arceo <[email protected]>

* checking in progress

Signed-off-by: Francisco Javier Arceo <[email protected]>

* updating test

Signed-off-by: Francisco Javier Arceo <[email protected]>

* adding test

Signed-off-by: Francisco Javier Arceo <[email protected]>

* linted and updated

Signed-off-by: Francisco Javier Arceo <[email protected]>

* removed print

Signed-off-by: Francisco Javier Arceo <[email protected]>

* updated tests to test actual behavior

Signed-off-by: Francisco Javier Arceo <[email protected]>

* checking in progress

Signed-off-by: Francisco Javier Arceo <[email protected]>

* changing typo

Signed-off-by: Francisco Javier Arceo <[email protected]>

* updating test

Signed-off-by: Francisco Javier Arceo <[email protected]>

* testing changes

Signed-off-by: Francisco Javier Arceo <[email protected]>

* checking to see if thing still working

Signed-off-by: Francisco Javier Arceo <[email protected]>

* removed print

Signed-off-by: Francisco Javier Arceo <[email protected]>

* undo change for odfv file

Signed-off-by: Francisco Javier Arceo <[email protected]>

* updated tests

Signed-off-by: Francisco Javier Arceo <[email protected]>

* okay well have the unit test working

Signed-off-by: Francisco Javier Arceo <[email protected]>

* type changes, hope i dont regret them

Signed-off-by: Francisco Javier Arceo <[email protected]>

* updated stream feature view piece

Signed-off-by: Francisco Javier Arceo <[email protected]>

* updated sfv ifelse

Signed-off-by: Francisco Javier Arceo <[email protected]>

* removing print

Signed-off-by: Francisco Javier Arceo <[email protected]>

* formatted and updated test

Signed-off-by: Francisco Javier Arceo <[email protected]>

* resolving some linter errors

Signed-off-by: Francisco Javier Arceo <[email protected]>

* fixed linter and formatting

Signed-off-by: Francisco Javier Arceo <[email protected]>

* okay think it is working

Signed-off-by: Francisco Javier Arceo <[email protected]>

* linter

Signed-off-by: Francisco Javier Arceo <[email protected]>

* updated type map for integration tests

Signed-off-by: Francisco Javier Arceo <[email protected]>

* updated local feature store test

Signed-off-by: Francisco Javier Arceo <[email protected]>

* fixed local fs test

Signed-off-by: Francisco Javier Arceo <[email protected]>

* chore: Updated snowflake test to be more explicit about post apply entity_columns return value (#4603)

chore: updated snowflake test to be more explicit about post apply entity_column return value

Signed-off-by: Francisco Javier Arceo <[email protected]>

* merging

Signed-off-by: Francisco Javier Arceo <[email protected]>

* fixed test to entity_rows_to_read

Signed-off-by: Francisco Javier Arceo <[email protected]>

* resolved inf conflicts

Signed-off-by: Francisco Javier Arceo <[email protected]>

* lint

Signed-off-by: Francisco Javier Arceo <[email protected]>

* Updated tests and lint, think I have everything working

Signed-off-by: Francisco Javier Arceo <[email protected]>

---------

Signed-off-by: Francisco Javier Arceo <[email protected]>
  • Loading branch information
franciscojavierarceo authored Oct 10, 2024
1 parent b5ab6c7 commit ef9e0bb
Show file tree
Hide file tree
Showing 20 changed files with 737 additions and 107 deletions.
13 changes: 9 additions & 4 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,10 +467,15 @@ def __init__(self):


class DataFrameSerializationError(FeastError):
def __init__(self, input_dict: dict):
super().__init__(
f"Failed to serialize the provided dictionary into a pandas DataFrame: {input_dict.keys()}"
)
def __init__(self, input: Any):
if isinstance(input, dict):
super().__init__(
f"Failed to serialize the provided dictionary into a pandas DataFrame: {input.keys()}"
)
else:
super().__init__(
"Failed to serialize the provided input into a pandas DataFrame"
)


class PermissionNotFoundException(FeastError):
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/feature_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ def __init__(
if isinstance(feature_grouping, BaseFeatureView):
self.feature_view_projections.append(feature_grouping.projection)

def infer_features(self, fvs_to_update: Dict[str, FeatureView]):
def infer_features(
self, fvs_to_update: Dict[str, Union[FeatureView, BaseFeatureView]]
):
"""
Infers the features for the projections of this feature service, and updates this feature
service in place.
Expand Down
70 changes: 54 additions & 16 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,12 @@ def _make_inferences(
update_feature_views_with_inferred_features_and_entities(
sfvs_to_update, entities + entities_to_update, self.config
)
# TODO(kevjumba): Update schema inferrence
# We need to attach the time stamp fields to the underlying data sources
# and cascade the dependencies
update_feature_views_with_inferred_features_and_entities(
odfvs_to_update, entities + entities_to_update, self.config
)
# TODO(kevjumba): Update schema inference
for sfv in sfvs_to_update:
if not sfv.schema:
raise ValueError(
Expand All @@ -618,8 +623,13 @@ def _make_inferences(
for odfv in odfvs_to_update:
odfv.infer_features()

odfvs_to_write = [
odfv for odfv in odfvs_to_update if odfv.write_to_online_store
]
# Update to include ODFVs with write to online store
fvs_to_update_map = {
view.name: view for view in [*views_to_update, *sfvs_to_update]
view.name: view
for view in [*views_to_update, *sfvs_to_update, *odfvs_to_write]
}
for feature_service in feature_services_to_update:
feature_service.infer_features(fvs_to_update=fvs_to_update_map)
Expand Down Expand Up @@ -847,6 +857,11 @@ def apply(
]
sfvs_to_update = [ob for ob in objects if isinstance(ob, StreamFeatureView)]
odfvs_to_update = [ob for ob in objects if isinstance(ob, OnDemandFeatureView)]
odfvs_with_writes_to_update = [
ob
for ob in objects
if isinstance(ob, OnDemandFeatureView) and ob.write_to_online_store
]
services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)]
data_sources_set_to_update = {
ob for ob in objects if isinstance(ob, DataSource)
Expand All @@ -868,10 +883,23 @@ def apply(
for batch_source in batch_sources_to_add:
data_sources_set_to_update.add(batch_source)

for fv in itertools.chain(views_to_update, sfvs_to_update):
data_sources_set_to_update.add(fv.batch_source)
if fv.stream_source:
data_sources_set_to_update.add(fv.stream_source)
for fv in itertools.chain(
views_to_update, sfvs_to_update, odfvs_with_writes_to_update
):
if isinstance(fv, FeatureView):
data_sources_set_to_update.add(fv.batch_source)
if hasattr(fv, "stream_source"):
if fv.stream_source:
data_sources_set_to_update.add(fv.stream_source)
if isinstance(fv, OnDemandFeatureView):
for source_fvp in fv.source_feature_view_projections:
odfv_batch_source: Optional[DataSource] = (
fv.source_feature_view_projections[source_fvp].batch_source
)
if odfv_batch_source is not None:
data_sources_set_to_update.add(odfv_batch_source)
else:
pass

for odfv in odfvs_to_update:
for v in odfv.source_request_sources.values():
Expand All @@ -884,7 +912,9 @@ def apply(

# Validate all feature views and make inferences.
self._validate_all_feature_views(
views_to_update, odfvs_to_update, sfvs_to_update
views_to_update,
odfvs_to_update,
sfvs_to_update,
)
self._make_inferences(
data_sources_to_update,
Expand Down Expand Up @@ -989,7 +1019,9 @@ def apply(
tables_to_delete: List[FeatureView] = (
views_to_delete + sfvs_to_delete if not partial else [] # type: ignore
)
tables_to_keep: List[FeatureView] = views_to_update + sfvs_to_update # type: ignore
tables_to_keep: List[
Union[FeatureView, StreamFeatureView, OnDemandFeatureView]
] = views_to_update + sfvs_to_update + odfvs_with_writes_to_update # type: ignore

self._get_provider().update_infra(
project=self.project,
Expand Down Expand Up @@ -1444,19 +1476,18 @@ def write_to_online_store(
inputs: Optional the dictionary object to be written
allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry.
"""
# TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type
feature_view_dict = {
fv_proto.name: fv_proto
for fv_proto in self.list_all_feature_views(allow_registry_cache)
}
try:
feature_view: FeatureView = self.get_stream_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
feature_view = feature_view_dict[feature_view_name]
except FeatureViewNotFoundException:
feature_view = self.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
raise FeatureViewNotFoundException(feature_view_name, self.project)
if df is not None and inputs is not None:
raise ValueError("Both df and inputs cannot be provided at the same time.")
if df is None and inputs is not None:
if isinstance(inputs, dict):
if isinstance(inputs, dict) or isinstance(inputs, List):
try:
df = pd.DataFrame(inputs)
except Exception as _:
Expand All @@ -1465,6 +1496,13 @@ def write_to_online_store(
pass
else:
raise ValueError("inputs must be a dictionary or a pandas DataFrame.")
if df is not None and inputs is None:
if isinstance(df, dict) or isinstance(df, List):
try:
df = pd.DataFrame(df)
except Exception as _:
raise DataFrameSerializationError(df)

provider = self._get_provider()
provider.ingest_df(feature_view, df)

Expand Down
39 changes: 23 additions & 16 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,13 @@ def update_feature_views_with_inferred_features_and_entities(
)

if not fv.features:
raise RegistryInferenceFailure(
"FeatureView",
f"Could not infer Features for the FeatureView named {fv.name}.",
)
if isinstance(fv, OnDemandFeatureView):
return None
else:
raise RegistryInferenceFailure(
"FeatureView",
f"Could not infer Features for the FeatureView named {fv.name}.",
)


def _infer_features_and_entities(
Expand All @@ -209,6 +212,7 @@ def _infer_features_and_entities(
fv, join_keys, run_inference_for_features, config
)

entity_columns: List[Field] = fv.entity_columns if fv.entity_columns else []
columns_to_exclude = {
fv.batch_source.timestamp_field,
fv.batch_source.created_timestamp_column,
Expand All @@ -235,7 +239,7 @@ def _infer_features_and_entities(
if field.name not in [
entity_column.name for entity_column in fv.entity_columns
]:
fv.entity_columns.append(field)
entity_columns.append(field)
elif not re.match(
"^__|__$", col_name
): # double underscores often signal an internal-use column
Expand All @@ -256,6 +260,8 @@ def _infer_features_and_entities(
if field.name not in [feature.name for feature in fv.features]:
fv.features.append(field)

fv.entity_columns = entity_columns


def _infer_on_demand_features_and_entities(
fv: OnDemandFeatureView,
Expand All @@ -282,18 +288,19 @@ def _infer_on_demand_features_and_entities(

batch_source = getattr(source_feature_view, "batch_source")
batch_field_mapping = getattr(batch_source or None, "field_mapping")
if batch_field_mapping:
for (
original_col,
mapped_col,
) in batch_field_mapping.items():
if mapped_col in columns_to_exclude:
columns_to_exclude.remove(mapped_col)
columns_to_exclude.add(original_col)
for (
original_col,
mapped_col,
) in batch_field_mapping.items():
if mapped_col in columns_to_exclude:
columns_to_exclude.remove(mapped_col)
columns_to_exclude.add(original_col)

table_column_names_and_types = batch_source.get_table_column_names_and_types(
config
)
batch_field_mapping = getattr(batch_source, "field_mapping", {})

table_column_names_and_types = (
batch_source.get_table_column_names_and_types(config)
)
for col_name, col_datatype in table_column_names_and_types:
if col_name in columns_to_exclude:
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _get_column_names
Expand Down Expand Up @@ -80,10 +81,10 @@ def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import RepoConfig
from feast.stream_feature_view import StreamFeatureView

Expand Down Expand Up @@ -89,7 +90,7 @@ def update(
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.passthrough_provider import PassthroughProvider
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.stream_feature_view import StreamFeatureView
Expand Down Expand Up @@ -77,10 +78,10 @@ def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import FeastConfigBaseModel
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _get_column_names
Expand Down Expand Up @@ -122,10 +123,10 @@ def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/infra/materialization/local_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.stream_feature_view import StreamFeatureView
from feast.utils import (
Expand Down Expand Up @@ -69,10 +70,10 @@ def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/infra/materialization/snowflake_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
get_snowflake_online_store_path,
package_snowpark_zip,
)
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
Expand Down Expand Up @@ -120,10 +121,10 @@ def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand Down
6 changes: 5 additions & 1 deletion sdk/python/feast/infra/online_stores/online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple, Union

from feast import Entity, utils
from feast.batch_feature_view import BatchFeatureView
from feast.feature_service import FeatureService
from feast.feature_view import FeatureView
from feast.infra.infra_object import InfraObject
Expand All @@ -27,6 +28,7 @@
from feast.protos.feast.types.Value_pb2 import RepeatedValue
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import RepoConfig
from feast.stream_feature_view import StreamFeatureView


class OnlineStore(ABC):
Expand Down Expand Up @@ -288,7 +290,9 @@ def update(
self,
config: RepoConfig,
tables_to_delete: Sequence[FeatureView],
tables_to_keep: Sequence[FeatureView],
tables_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
Expand Down
Loading

0 comments on commit ef9e0bb

Please sign in to comment.