Skip to content

Commit

Permalink
feat: Implement gRPC server to ingest streaming features (#3687)
Browse files Browse the repository at this point in the history
* Implemented gRPC server for ingesting streaming features.

Signed-off-by: mehmettokgoz <[email protected]>
Signed-off-by: Danny C <[email protected]>
  • Loading branch information
mehmettokgoz authored Sep 7, 2023
1 parent f2c5988 commit a3fcd1f
Show file tree
Hide file tree
Showing 11 changed files with 999 additions and 696 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ kill-trino-locally:
cd ${ROOT_DIR}; docker stop trino

install-protoc-dependencies:
pip install --ignore-installed protobuf==4.23.4 grpcio-tools==1.47.0 mypy-protobuf==3.1.0
pip install --ignore-installed protobuf==4.23.4 "grpcio-tools>=1.56.2,<2" mypy-protobuf==3.1.0

install-feast-ci-locally:
pip install -e ".[ci]"
Expand Down
27 changes: 27 additions & 0 deletions protos/feast/serving/GrpcServer.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
syntax = "proto3";

message PushRequest {
map<string, string> features = 1;
string stream_feature_view = 2;
bool allow_registry_cache = 3;
string to = 4;
}

message PushResponse {
bool status = 1;
}

message WriteToOnlineStoreRequest {
map<string, string> features = 1;
string feature_view_name = 2;
bool allow_registry_cache = 3;
}

message WriteToOnlineStoreResponse {
bool status = 1;
}

service GrpcFeatureServer {
rpc Push (PushRequest) returns (PushResponse) {};
rpc WriteToOnlineStore (WriteToOnlineStoreRequest) returns (WriteToOnlineStoreResponse);
}
31 changes: 31 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from feast.constants import DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT
from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.contrib.grpc_server import get_grpc_server
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import load_repo_config
from feast.repo_operations import (
Expand Down Expand Up @@ -689,6 +690,36 @@ def serve_command(
)


@cli.command("listen")
@click.option(
"--address",
"-a",
type=click.STRING,
default="localhost:50051",
show_default=True,
help="Address of the gRPC server",
)
@click.option(
"--max_workers",
"-w",
type=click.INT,
default=10,
show_default=False,
help="The maximum number of threads that can be used to execute the gRPC calls",
)
@click.pass_context
def listen_command(
ctx: click.Context,
address: str,
max_workers: int,
):
"""Start a gRPC feature server to ingest streaming features on given address"""
store = create_feature_store(ctx)
server = get_grpc_server(address, store, max_workers)
server.start()
server.wait_for_termination()


@cli.command("serve_transformations")
@click.option(
"--port",
Expand Down
95 changes: 95 additions & 0 deletions sdk/python/feast/infra/contrib/grpc_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import logging
from concurrent import futures

import grpc
import pandas as pd
from grpc_health.v1 import health, health_pb2_grpc

from feast.data_source import PushMode
from feast.errors import PushSourceNotFoundException
from feast.feature_store import FeatureStore
from feast.protos.feast.serving.GrpcServer_pb2 import (
PushResponse,
WriteToOnlineStoreResponse,
)
from feast.protos.feast.serving.GrpcServer_pb2_grpc import (
GrpcFeatureServerServicer,
add_GrpcFeatureServerServicer_to_server,
)


def parse(features):
df = {}
for i in features.keys():
df[i] = [features.get(i)]
return pd.DataFrame.from_dict(df)


class GrpcFeatureServer(GrpcFeatureServerServicer):
fs: FeatureStore

def __init__(self, fs: FeatureStore):
self.fs = fs
super().__init__()

def Push(self, request, context):
try:
df = parse(request.features)
if request.to == "offline":
to = PushMode.OFFLINE
elif request.to == "online":
to = PushMode.ONLINE
elif request.to == "online_and_offline":
to = PushMode.ONLINE_AND_OFFLINE
else:
raise ValueError(
f"{request.to} is not a supported push format. Please specify one of these ['online', 'offline', "
f"'online_and_offline']."
)
self.fs.push(
push_source_name=request.push_source_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
to=to,
)
except PushSourceNotFoundException as e:
logging.exception(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details(str(e))
return PushResponse(status=False)
except Exception as e:
logging.exception(str(e))
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(e))
return PushResponse(status=False)
return PushResponse(status=True)

def WriteToOnlineStore(self, request, context):
logging.warning(
"write_to_online_store is deprecated. Please consider using Push instead"
)
try:
df = parse(request.features)
self.fs.write_to_online_store(
feature_view_name=request.feature_view_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
)
except Exception as e:
logging.exception(str(e))
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(e))
return PushResponse(status=False)
return WriteToOnlineStoreResponse(status=True)


def get_grpc_server(address: str, fs: FeatureStore, max_workers: int):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
add_GrpcFeatureServerServicer_to_server(GrpcFeatureServer(fs), server)
health_servicer = health.HealthServicer(
experimental_non_blocking=True,
experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=max_workers),
)
health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
server.add_insecure_port(address)
return server
Loading

0 comments on commit a3fcd1f

Please sign in to comment.