diff --git a/python/alibiexplainer/alibiexplainer/explainer.py b/python/alibiexplainer/alibiexplainer/explainer.py index 02bf8184749..3cfdf919cbd 100644 --- a/python/alibiexplainer/alibiexplainer/explainer.py +++ b/python/alibiexplainer/alibiexplainer/explainer.py @@ -72,10 +72,12 @@ def _predict_fn(self, arr: Union[np.ndarray, List]) -> np.ndarray: else: instances.append(req_data) loop = asyncio.get_running_loop() # type: ignore - resp = loop.run_until_complete(self.predict({"instances": instances})) + resp, response_headers = loop.run_until_complete( + self.predict({"instances": instances})) return np.array(resp["predictions"]) def explain(self, payload: Dict, headers: Dict[str, str] = None) -> Any: + response_headers = {'my-header': 'sample'} if ( self.method is ExplainerMethod.anchor_tabular or self.method is ExplainerMethod.anchor_images @@ -84,6 +86,6 @@ def explain(self, payload: Dict, headers: Dict[str, str] = None) -> Any: explanation = self.wrapper.explain(payload["instances"]) explanationAsJsonStr = explanation.to_json() logging.info("Explanation: %s", explanationAsJsonStr) - return json.loads(explanationAsJsonStr) + return json.loads(explanationAsJsonStr), response_headers raise NotImplementedError diff --git a/python/alibiexplainer/tests/utils.py b/python/alibiexplainer/tests/utils.py index ec296e5ce97..c92a941a96b 100644 --- a/python/alibiexplainer/tests/utils.py +++ b/python/alibiexplainer/tests/utils.py @@ -28,5 +28,5 @@ def predict_fn(self, arr: Union[np.ndarray, List]) -> np.ndarray: instances.append(req_data.tolist()) else: instances.append(req_data) - resp = self.clf.predict({"instances": instances}) + resp, response_headers = self.clf.predict({"instances": instances}) return np.array(resp["predictions"]) diff --git a/python/artexplainer/artserver/model.py b/python/artexplainer/artserver/model.py index b46ddf96d7b..e8dadad342d 100644 --- a/python/artexplainer/artserver/model.py +++ b/python/artexplainer/artserver/model.py @@ -49,11 +49,12 @@ def _predict(self, x): scoring_data = {'instances': input_image.tolist()} loop = asyncio.get_running_loop() - resp = loop.run_until_complete(self.predict(scoring_data)) + resp, response_headers = loop.run_until_complete(self.predict(scoring_data)) prediction = np.array(resp["predictions"]) return [1 if x == prediction else 0 for x in range(0, self.nb_classes)] def explain(self, payload: Dict, headers: Dict[str, str] = None) -> Dict: + response_headers = {} image = payload["instances"][0] label = payload["instances"][1] try: @@ -74,7 +75,9 @@ def explain(self, payload: Dict, headers: Dict[str, str] = None) -> Dict: adv_preds = np.argmax(classifier.predict(x_adv)) l2_error = np.linalg.norm(np.reshape(x_adv[0] - inputs, [-1])) - return {"explanations": {"adversarial_example": x_adv.tolist(), "L2 error": l2_error.tolist(), - "adversarial_prediction": adv_preds.tolist(), "prediction": preds.tolist()}} + return ( + {"explanations": {"adversarial_example": x_adv.tolist(), "L2 error": l2_error.tolist(), + "adversarial_prediction": adv_preds.tolist(), "prediction": preds.tolist()}}, + response_headers) except Exception as e: raise Exception("Failed to explain %s" % e) diff --git a/python/custom_model/model.py b/python/custom_model/model.py index 4ab7140cc4e..5215cbb64ed 100644 --- a/python/custom_model/model.py +++ b/python/custom_model/model.py @@ -84,6 +84,7 @@ def preprocess(self, payload: Union[Dict, InferRequest], headers: Dict[str, str] return input_tensor.unsqueeze(0) def predict(self, input_tensor: torch.Tensor, headers: Dict[str, str] = None) -> Union[Dict, InferResponse]: + response_headers = {} output = self.model(input_tensor) torch.nn.functional.softmax(output, dim=1) values, top_5 = torch.topk(output, 5) @@ -92,9 +93,9 @@ def predict(self, input_tensor: torch.Tensor, headers: Dict[str, str] = None) -> infer_output = InferOutput(name="output-0", shape=list(values.shape), datatype="FP32", data=result) infer_response = InferResponse(model_name=self.name, infer_outputs=[infer_output], response_id=response_id) if "request-type" in headers and headers["request-type"] == "v1": - return {"predictions": result} + return {"predictions": result}, response_headers else: - return infer_response + return infer_response, response_headers parser = argparse.ArgumentParser(parents=[model_server.parser]) diff --git a/python/custom_model/model_grpc.py b/python/custom_model/model_grpc.py index 00d9a1cf716..109624201a2 100644 --- a/python/custom_model/model_grpc.py +++ b/python/custom_model/model_grpc.py @@ -57,6 +57,7 @@ def preprocess(self, payload: InferRequest, headers: Dict[str, str] = None) -> t return torch.Tensor(np_array) def predict(self, input_tensor: torch.Tensor, headers: Dict[str, str] = None) -> Dict: + response_headers = {} output = self.model(input_tensor) torch.nn.functional.softmax(output, dim=1) values, top_5 = torch.topk(output, 5) @@ -75,7 +76,7 @@ def predict(self, input_tensor: torch.Tensor, headers: Dict[str, str] = None) -> "shape": list(values.shape) } ]} - return response + return response, response_headers if __name__ == "__main__": diff --git a/python/huggingfaceserver/huggingfaceserver/model.py b/python/huggingfaceserver/huggingfaceserver/model.py index 0d2ef725145..7575ee96064 100644 --- a/python/huggingfaceserver/huggingfaceserver/model.py +++ b/python/huggingfaceserver/huggingfaceserver/model.py @@ -196,6 +196,7 @@ async def generate(self, generate_request: GenerateRequest, headers: Dict[str, s async def predict(self, input_batch: Union[BatchEncoding, InferRequest], context: Dict[str, Any] = None) \ -> Union[Tensor, InferResponse]: + response_headers = {} if self.predictor_host: # when predictor_host is provided, serialize the tensor and send to optimized model serving runtime # like NVIDIA triton inference server @@ -208,7 +209,7 @@ async def predict(self, input_batch: Union[BatchEncoding, InferRequest], context outputs = self.model.generate(**input_batch) else: outputs = self.model(**input_batch).logits - return outputs + return outputs, response_headers except Exception as e: raise InferenceError(str(e)) diff --git a/python/huggingfaceserver/huggingfaceserver/test_model.py b/python/huggingfaceserver/huggingfaceserver/test_model.py index 0cf957d0696..2ba74bfb7ec 100644 --- a/python/huggingfaceserver/huggingfaceserver/test_model.py +++ b/python/huggingfaceserver/huggingfaceserver/test_model.py @@ -27,7 +27,7 @@ def test_t5(): model.load() request = "translate this to germany" - response = asyncio.run(model({"instances": [request, request]}, headers={})) + response, response_headers = asyncio.run(model({"instances": [request, request]}, headers={})) assert response == {"predictions": ['Das ist für Deutschland', 'Das ist für Deutschland']} @@ -35,8 +35,8 @@ def test_bert(): model = HuggingfaceModel("bert-base-uncased", {"model_id": "bert-base-uncased", "do_lower_case": True}) model.load() - response = asyncio.run(model({"instances": ["The capital of France is [MASK].", - "The capital of [MASK] is paris."]}, headers={})) + response, response_headers = asyncio.run(model({"instances": ["The capital of France is [MASK].", + "The capital of [MASK] is paris."]}, headers={})) assert response == {"predictions": ["paris", "france"]} @@ -51,7 +51,7 @@ def test_bert_predictor_host(httpx_mock: HTTPXMock): predictor_host="localhost:8081", predictor_protocol="v2")) model.load() - response = asyncio.run(model({"instances": ["The capital of France is [MASK]."]}, headers={})) + response, response_headers = asyncio.run(model({"instances": ["The capital of France is [MASK]."]}, headers={})) assert response == {"predictions": ["[PAD]"]} @@ -62,7 +62,7 @@ def test_bert_sequence_classification(): model.load() request = "Hello, my dog is cute." - response = asyncio.run(model({"instances": [request, request]}, headers={})) + response, response_headers = asyncio.run(model({"instances": [request, request]}, headers={})) assert response == {"predictions": [1, 1]} @@ -73,7 +73,7 @@ def test_bert_token_classification(): model.load() request = "HuggingFace is a company based in Paris and New York" - response = asyncio.run(model({"instances": [request, request]}, headers={})) + response, response_headers = asyncio.run(model({"instances": [request, request]}, headers={})) assert response == {"predictions": [[[0, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]], [[0, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]]} diff --git a/python/kserve/kserve/model.py b/python/kserve/kserve/model.py index c503faf95c5..8552608febc 100644 --- a/python/kserve/kserve/model.py +++ b/python/kserve/kserve/model.py @@ -15,7 +15,7 @@ import inspect import time from enum import Enum -from typing import Dict, List, Union, Optional, AsyncIterator, Any +from typing import Dict, List, Tuple, Union, Optional, AsyncIterator, Any import grpc import httpx @@ -135,14 +135,18 @@ async def __call__(self, body: Union[Dict, CloudEvent, InferRequest], if verb == InferenceVerb.EXPLAIN: with EXPLAIN_HIST_TIME.labels(**prom_labels).time(): start = time.time() - response = (await self.explain(payload, headers)) if inspect.iscoroutinefunction(self.explain) \ - else self.explain(payload, headers) + if inspect.iscoroutinefunction(self.explain): + response, response_headers = (await self.explain(payload, headers)) + else: + response, response_headers = self.explain(payload, headers) explain_ms = get_latency_ms(start, time.time()) elif verb == InferenceVerb.PREDICT: with PREDICT_HIST_TIME.labels(**prom_labels).time(): start = time.time() - response = (await self.predict(payload, headers)) if inspect.iscoroutinefunction(self.predict) \ - else self.predict(payload, headers) + if inspect.iscoroutinefunction(self.predict): + response, response_headers = (await self.predict(payload, headers)) + else: + response, response_headers = self.predict(payload, headers) predict_ms = get_latency_ms(start, time.time()) else: raise NotImplementedError @@ -158,7 +162,7 @@ async def __call__(self, body: Union[Dict, CloudEvent, InferRequest], f"explain_ms: {explain_ms}, predict_ms: {predict_ms}, " f"postprocess_ms: {postprocess_ms}") - return response + return response, response_headers @property def _http_client(self): @@ -174,10 +178,12 @@ def _grpc_client(self): port = 443 if self.use_ssl else 80 self.predictor_host = f"{self.predictor_host}:{port}" if self.use_ssl: - _channel = grpc.aio.secure_channel(self.predictor_host, grpc.ssl_channel_credentials()) + _channel = grpc.aio.secure_channel( + self.predictor_host, grpc.ssl_channel_credentials()) else: _channel = grpc.aio.insecure_channel(self.predictor_host) - self._grpc_client_stub = grpc_predict_v2_pb2_grpc.GRPCInferenceServiceStub(_channel) + self._grpc_client_stub = grpc_predict_v2_pb2_grpc.GRPCInferenceServiceStub( + _channel) return self._grpc_client_stub def validate(self, payload): @@ -252,9 +258,11 @@ async def postprocess(self, result: Union[Dict, InferResponse], headers: Dict[st async def _http_predict(self, payload: Union[Dict, InferRequest], headers: Dict[str, str] = None) -> Dict: protocol = "https" if self.use_ssl else "http" - predict_url = PREDICTOR_URL_FORMAT.format(protocol, self.predictor_host, self.name) + predict_url = PREDICTOR_URL_FORMAT.format( + protocol, self.predictor_host, self.name) if self.protocol == PredictorProtocol.REST_V2.value: - predict_url = PREDICTOR_V2_URL_FORMAT.format(protocol, self.predictor_host, self.name) + predict_url = PREDICTOR_V2_URL_FORMAT.format( + protocol, self.predictor_host, self.name) # Adjusting headers. Inject content type if not exist. # Also, removing host, as the header is the one passed to transformer and contains transformer's host @@ -283,11 +291,13 @@ async def _http_predict(self, payload: Union[Dict, InferRequest], headers: Dict[ if "error" in error_message: error_message = error_message["error"] message = message.format(response, error_message=error_message) - raise HTTPStatusError(message, request=response.request, response=response) - return orjson.loads(response.content) + raise HTTPStatusError( + message, request=response.request, response=response) + return orjson.loads(response.content), response.headers async def _grpc_predict(self, payload: Union[ModelInferRequest, InferRequest], headers: Dict[str, str] = None) \ -> ModelInferResponse: + response_headers = {} if isinstance(payload, InferRequest): payload = payload.to_grpc() async_result = await self._grpc_client.ModelInfer( @@ -297,12 +307,11 @@ async def _grpc_predict(self, payload: Union[ModelInferRequest, InferRequest], h ('response_type', 'grpc_v2'), ('x-request-id', headers.get('x-request-id', ''))) ) - return async_result + return async_result, response_headers async def predict(self, payload: Union[Dict, InferRequest, ModelInferRequest], - headers: Dict[str, str] = None) -> Union[Dict, InferResponse]: - """ The `predict` handler can be overridden for performing the inference. - By default, the predict handler makes call to predictor for the inference step. + headers: Dict[str, str] = None) -> Tuple[Union[Dict, InferResponse], Dict]: + """ Args: payload: Model inputs passed from `preprocess` handler. @@ -317,12 +326,21 @@ async def predict(self, payload: Union[Dict, InferRequest, ModelInferRequest], if not self.predictor_host: raise NotImplementedError("Could not find predictor_host.") if self.protocol == PredictorProtocol.GRPC_V2.value: - res = await self._grpc_predict(payload, headers) - return InferResponse.from_grpc(res) + response_content, response_headers = await self._grpc_predict(payload, headers) + return InferResponse.from_grpc(response_content), response_headers else: - res = await self._http_predict(payload, headers) + response_content, response_headers = await self._http_predict(payload, headers) + response_headers = {} + # Check if 'Content-Length' header exists in the response.headers dictionary + if 'Content-Length' in response_headers: + # Remove the 'Content-Length' from response header + del response_headers['Content-Length'] + # return an InferResponse if this is REST V2, otherwise just return the dictionary - return InferResponse.from_rest(self.name, res) if is_v2(PredictorProtocol(self.protocol)) else res + if is_v2(PredictorProtocol(self.protocol)): + return InferResponse.from_rest(self.name, response_content), response_headers + else: + return response_content, response_headers async def generate(self, payload: GenerateRequest, headers: Dict[str, str] = None) -> Union[GenerateResponse, AsyncIterator[Any]]: @@ -358,4 +376,4 @@ async def explain(self, payload: Dict, headers: Dict[str, str] = None) -> Dict: ) response.raise_for_status() - return orjson.loads(response.content) + return orjson.loads(response.content), response.headers diff --git a/python/kserve/kserve/protocol/dataplane.py b/python/kserve/kserve/protocol/dataplane.py index 5c3881c8073..08c3eaa33a7 100644 --- a/python/kserve/kserve/protocol/dataplane.py +++ b/python/kserve/kserve/protocol/dataplane.py @@ -266,7 +266,10 @@ def decode_cloudevent(self, body) -> Tuple[Union[Dict, InferRequest], Dict]: return decoded_body, attributes def encode(self, model_name, response, headers, req_attributes: Dict) -> Tuple[Dict, Dict[str, str]]: - response_headers = {} + response_headers = headers + if not headers: + response_headers = {} + # if we received a cloudevent, then also return a cloudevent is_cloudevent = False is_binary_cloudevent = False @@ -317,12 +320,12 @@ async def infer( # call model locally or remote model workers model = self.get_model(model_name) if isinstance(model, RayServeSyncHandle): - response = ray.get(model.remote(request, headers=headers)) + response, response_headers = ray.get(model.remote(request, headers=headers)) elif isinstance(model, (RayServeHandle, DeploymentHandle)): - response = await model.remote(request, headers=headers) + response, response_headers = await model.remote(request, headers=headers) else: - response = await model(request, headers=headers) - return response, headers + response, response_headers = await model(request, headers=headers) + return response, response_headers async def generate( self, @@ -368,9 +371,9 @@ async def explain(self, model_name: str, # call model locally or remote model workers model = self.get_model(model_name) if isinstance(model, RayServeSyncHandle): - response = ray.get(model.remote(request, verb=InferenceVerb.EXPLAIN)) + response, response_headers = ray.get(model.remote(request, verb=InferenceVerb.EXPLAIN)) elif isinstance(model, (RayServeHandle, DeploymentHandle)): - response = await model.remote(request, verb=InferenceVerb.EXPLAIN) + response, response_headers = await model.remote(request, verb=InferenceVerb.EXPLAIN) else: - response = await model(request, verb=InferenceVerb.EXPLAIN) - return response, headers + response, response_headers = await model(request, verb=InferenceVerb.EXPLAIN) + return response, response_headers diff --git a/python/kserve/kserve/protocol/rest/v1_endpoints.py b/python/kserve/kserve/protocol/rest/v1_endpoints.py index 11c31586229..2a33818d5ae 100644 --- a/python/kserve/kserve/protocol/rest/v1_endpoints.py +++ b/python/kserve/kserve/protocol/rest/v1_endpoints.py @@ -14,6 +14,7 @@ from typing import Optional, Union, Dict, List from fastapi import Request, Response +from fastapi.responses import JSONResponse from kserve.errors import ModelNotReady from ..dataplane import DataPlane @@ -78,11 +79,12 @@ async def predict(self, model_name: str, request: Request) -> Union[Response, Di headers=headers) response, response_headers = self.dataplane.encode(model_name=model_name, response=response, - headers=headers, req_attributes=req_attributes) + headers=response_headers, req_attributes=req_attributes) - if not isinstance(response, dict): - return Response(content=response, headers=response_headers) - return response + if isinstance(response, dict): + return JSONResponse(content=response, headers=response_headers) + + return Response(content=response, headers=response_headers) async def explain(self, model_name: str, request: Request) -> Union[Response, Dict]: """Explain handler. @@ -108,8 +110,9 @@ async def explain(self, model_name: str, request: Request) -> Union[Response, Di headers=headers) response, response_headers = self.dataplane.encode(model_name=model_name, response=response, - headers=headers, req_attributes=req_attributes) + headers=response_headers, req_attributes=req_attributes) + + if isinstance(response, dict): + return JSONResponse(content=response, headers=response_headers) - if not isinstance(response, dict): - return Response(content=response, headers=response_headers) - return response + return Response(content=response, headers=response_headers) diff --git a/python/kserve/kserve/protocol/rest/v2_endpoints.py b/python/kserve/kserve/protocol/rest/v2_endpoints.py index 33682209e60..70b8c29cce0 100644 --- a/python/kserve/kserve/protocol/rest/v2_endpoints.py +++ b/python/kserve/kserve/protocol/rest/v2_endpoints.py @@ -148,6 +148,7 @@ async def infer( response, response_headers = await self.dataplane.infer(model_name=model_name, request=infer_request, headers=request_headers) + # response is an object that should be used like response.headers for accessing headers response, response_headers = self.dataplane.encode(model_name=model_name, response=response, diff --git a/python/kserve/test/test_server.py b/python/kserve/test/test_server.py index f9a24bd09d7..97e12905337 100644 --- a/python/kserve/test/test_server.py +++ b/python/kserve/test/test_server.py @@ -79,15 +79,17 @@ def load(self): self.ready = True async def predict(self, request, headers=None): + response_headers = {} if isinstance(request, InferRequest): inputs = get_predict_input(request) infer_response = get_predict_response(request, inputs, self.name) - return infer_response + return infer_response, response_headers else: - return {"predictions": request["instances"]} + return {"predictions": request["instances"]}, response_headers async def explain(self, request, headers=None): - return {"predictions": request["instances"]} + response_headers = {} + return {"predictions": request["instances"]}, response_headers @serve.deployment @@ -101,15 +103,17 @@ def load(self): self.ready = True async def predict(self, request, headers=None): + response_headers = {} if isinstance(request, InferRequest): inputs = get_predict_input(request) infer_response = get_predict_response(request, inputs, self.name) - return infer_response + return infer_response, response_headers else: - return {"predictions": request["instances"]} + return {"predictions": request["instances"]}, response_headers async def explain(self, request, headers=None): - return {"predictions": request["instances"]} + response_headers = {} + return {"predictions": request["instances"]}, response_headers class DummyCEModel(Model): @@ -122,10 +126,12 @@ def load(self): self.ready = True async def predict(self, request, headers=None): - return {"predictions": request["instances"]} + response_headers = headers + return {"predictions": request["instances"]}, response_headers async def explain(self, request, headers=None): - return {"predictions": request["instances"]} + response_headers = headers + return {"predictions": request["instances"]}, response_headers class DummyAvroCEModel(Model): @@ -154,12 +160,18 @@ def preprocess(self, request, headers: Dict[str, str] = None): return self._parserequest(request) async def predict(self, request, headers=None): - return {"predictions": [[request['name'], request['favorite_number'], - request['favorite_color']]]} + response_headers = headers + return ( + {"predictions": [ + [request['name'], request['favorite_number'], request['favorite_color']]]}, + response_headers) async def explain(self, request, headers=None): - return {"predictions": [[request['name'], request['favorite_number'], - request['favorite_color']]]} + response_headers = headers + return ( + {"predictions": [ + [request['name'], request['favorite_number'], request['favorite_color']]]}, + response_headers) class DummyModelRepository(ModelRepository): @@ -214,7 +226,8 @@ def app(self): model.load() server = ModelServer() server.register_model(model) - rest_server = RESTServer(server.dataplane, server.model_repository_extension) + rest_server = RESTServer( + server.dataplane, server.model_repository_extension) return rest_server.create_application() @pytest.fixture(scope="class") @@ -233,7 +246,8 @@ def test_model_v1(self, http_server_client): def test_unknown_model_v1(self, http_server_client): resp = http_server_client.get('/v1/models/InvalidModel') assert resp.status_code == 404 - assert resp.json() == {"error": "Model with name InvalidModel does not exist."} + assert resp.json() == { + "error": "Model with name InvalidModel does not exist."} def test_list_models_v1(self, http_server_client): resp = http_server_client.get('/v1/models') @@ -319,6 +333,7 @@ def test_infer_parameters_v2(self, http_server_client): input_data = json.dumps(req.to_rest()).encode('utf-8') with patch.object(DummyModel, 'predict', new_callable=mock.Mock) as mock_predict: + response_headers = {} mock_predict.return_value = InferResponse(model_name="TestModel", response_id="123", parameters={ "test-str": "dummy", @@ -333,7 +348,7 @@ def test_infer_parameters_v2(self, http_server_client): "test-str": "dummy", "test-bool": True, "test-int": 100 - })]) + })]), response_headers resp = http_server_client.post('/v2/models/TestModel/infer', content=input_data) mock_predict.assert_called_with(req, mock.ANY) @@ -356,7 +371,8 @@ def app(self): # pylint: disable=no-self-use server = ModelServer() server.register_model_handle("TestModel", handle) - rest_server = RESTServer(server.dataplane, server.model_repository_extension) + rest_server = RESTServer( + server.dataplane, server.model_repository_extension) return rest_server.create_application() @pytest.fixture(scope='class') @@ -410,7 +426,8 @@ def app(self): # pylint: disable=no-self-use model = DummyModel("TestModel") server = ModelServer() server.register_model(model) - rest_server = RESTServer(server.dataplane, server.model_repository_extension) + rest_server = RESTServer( + server.dataplane, server.model_repository_extension) return rest_server.create_application() @pytest.fixture(scope='class') @@ -429,7 +446,8 @@ def app(self): # pylint: disable=no-self-use model.load() server = ModelServer() server.register_model(model) - rest_server = RESTServer(server.dataplane, server.model_repository_extension) + rest_server = RESTServer( + server.dataplane, server.model_repository_extension) return rest_server.create_application() @pytest.fixture(scope='class') @@ -472,7 +490,8 @@ def test_predict_custom_ce_attributes(self, http_server_client): def test_predict_merge_structured_ce_attributes(self, http_server_client): with mock.patch.dict(os.environ, {"CE_MERGE": "true"}): - event = dummy_cloud_event({"instances": [[1, 2]]}, add_extension=True) + event = dummy_cloud_event( + {"instances": [[1, 2]]}, add_extension=True) headers, body = to_structured(event) resp = http_server_client.post('/v1/models/TestModel:predict', headers=headers, content=body) @@ -485,12 +504,14 @@ def test_predict_merge_structured_ce_attributes(self, http_server_client): assert body["data"] == {"predictions": [[1, 2]]} assert body['source'] == "io.kserve.inference.TestModel" assert body['type'] == "io.kserve.inference.response" - assert body["custom-extension"] == "custom-value" # Added by add_extension=True in dummy_cloud_event + # Added by add_extension=True in dummy_cloud_event + assert body["custom-extension"] == "custom-value" assert body['time'] > "2021-01-28T21:04:43.144141+00:00" def test_predict_merge_binary_ce_attributes(self, http_server_client): with mock.patch.dict(os.environ, {"CE_MERGE": "true"}): - event = dummy_cloud_event({"instances": [[1, 2]]}, set_contenttype=True, add_extension=True) + event = dummy_cloud_event( + {"instances": [[1, 2]]}, set_contenttype=True, add_extension=True) headers, body = to_binary(event) resp = http_server_client.post('/v1/models/TestModel:predict', headers=headers, content=body) @@ -507,7 +528,8 @@ def test_predict_merge_binary_ce_attributes(self, http_server_client): assert resp.content == b'{"predictions": [[1, 2]]}' def test_predict_ce_binary_dict(self, http_server_client): - event = dummy_cloud_event({"instances": [[1, 2]]}, set_contenttype=True) + event = dummy_cloud_event( + {"instances": [[1, 2]]}, set_contenttype=True) headers, body = to_binary(event) resp = http_server_client.post('/v1/models/TestModel:predict', headers=headers, content=body) @@ -522,7 +544,8 @@ def test_predict_ce_binary_dict(self, http_server_client): assert resp.content == b'{"predictions": [[1, 2]]}' def test_predict_ce_binary_bytes(self, http_server_client): - event = dummy_cloud_event(b'{"instances":[[1,2]]}', set_contenttype=True) + event = dummy_cloud_event( + b'{"instances":[[1,2]]}', set_contenttype=True) headers, body = to_binary(event) resp = http_server_client.post('/v1/models/TestModel:predict', headers=headers, content=body) @@ -548,7 +571,8 @@ def test_predict_ce_bytes_bad_format_exception(self, http_server_client): assert error_regex.match(response["error"]) is not None def test_predict_ce_bytes_bad_hex_format_exception(self, http_server_client): - event = dummy_cloud_event(b'0\x80\x80\x06World!\x00\x00', set_contenttype=True) + event = dummy_cloud_event( + b'0\x80\x80\x06World!\x00\x00', set_contenttype=True) headers, body = to_binary(event) resp = http_server_client.post('/v1/models/TestModel:predict', headers=headers, content=body) @@ -568,7 +592,8 @@ def app(self): # pylint: disable=no-self-use model.load() server = ModelServer() server.register_model(model) - rest_server = RESTServer(server.dataplane, server.model_repository_extension) + rest_server = RESTServer( + server.dataplane, server.model_repository_extension) return rest_server.create_application() @pytest.fixture(scope='class') @@ -585,7 +610,8 @@ def test_predict_ce_avro_binary(self, http_server_client): writer.write(msg, encoder) data = bytes_writer.getvalue() - event = dummy_cloud_event(data, set_contenttype=True, contenttype="application/avro") + event = dummy_cloud_event( + data, set_contenttype=True, contenttype="application/avro") # Creates the HTTP request representation of the CloudEvent in binary content mode headers, body = to_binary(event) resp = http_server_client.post('/v1/models/TestModel:predict', headers=headers, content=body) @@ -604,8 +630,10 @@ class TestTFHttpServerLoadAndUnLoad: @pytest.fixture(scope="class") def app(self): # pylint: disable=no-self-use - server = ModelServer(registered_models=DummyModelRepository(test_load_success=True)) - rest_server = RESTServer(server.dataplane, server.model_repository_extension) + server = ModelServer( + registered_models=DummyModelRepository(test_load_success=True)) + rest_server = RESTServer( + server.dataplane, server.model_repository_extension) return rest_server.create_application() @pytest.fixture(scope='class') @@ -626,8 +654,10 @@ def test_unload(self, http_server_client): class TestTFHttpServerLoadAndUnLoadFailure: @pytest.fixture(scope="class") def app(self): # pylint: disable=no-self-use - server = ModelServer(registered_models=DummyModelRepository(test_load_success=False)) - rest_server = RESTServer(server.dataplane, server.model_repository_extension) + server = ModelServer( + registered_models=DummyModelRepository(test_load_success=False)) + rest_server = RESTServer( + server.dataplane, server.model_repository_extension) return rest_server.create_application() @pytest.fixture(scope='class') @@ -649,7 +679,8 @@ def app(self): # pylint: disable=no-self-use model = DummyModel("TestModel") server = ModelServer() server.register_model(model) - rest_server = RESTServer(server.dataplane, server.model_repository_extension) + rest_server = RESTServer( + server.dataplane, server.model_repository_extension) return rest_server.create_application() @pytest.fixture(scope='class') diff --git a/python/lgbserver/lgbserver/model.py b/python/lgbserver/lgbserver/model.py index cd42e8a60a7..af2b8d492eb 100644 --- a/python/lgbserver/lgbserver/model.py +++ b/python/lgbserver/lgbserver/model.py @@ -57,9 +57,10 @@ def load(self) -> bool: return self.ready def predict(self, payload: Union[Dict, InferRequest], headers: Dict[str, str] = None) -> Union[Dict, InferResponse]: + response_headers = {} try: instances = get_predict_input(payload, columns=self._booster.feature_name()) result = self._booster.predict(instances) - return get_predict_response(payload, result, self.name) + return get_predict_response(payload, result, self.name), response_headers except Exception as e: raise InferenceError(str(e)) diff --git a/python/lgbserver/lgbserver/test_model.py b/python/lgbserver/lgbserver/test_model.py index 0e72aacc7cc..0368a043ef1 100644 --- a/python/lgbserver/lgbserver/test_model.py +++ b/python/lgbserver/lgbserver/test_model.py @@ -46,37 +46,37 @@ def test_model(): request = {'sepal_width_(cm)': {0: 3.5}, 'petal_length_(cm)': {0: 1.4}, 'petal_width_(cm)': {0: 0.2}, 'sepal_length_(cm)': {0: 5.1}} - response = model.predict({"inputs": [request, request]}) + response, response_headers = model.predict({"inputs": [request, request]}) assert numpy.argmax(response["predictions"][0]) == 0 - response = model.predict({"instances": [request, request]}) + response, response_headers = model.predict({"instances": [request, request]}) assert numpy.argmax(response["predictions"][0]) == 0 request = [ {'sepal_width_(cm)': 3.5}, {'petal_length_(cm)': 1.4}, {'petal_width_(cm)': 0.2}, {'sepal_length_(cm)': 5.1} ] - response = model.predict({"inputs": [request, request]}) + response, response_headers = model.predict({"inputs": [request, request]}) assert numpy.argmax(response["predictions"][0]) == 0 - response = model.predict({"instances": [request, request]}) + response, response_headers = model.predict({"instances": [request, request]}) assert numpy.argmax(response["predictions"][0]) == 0 request = [ {'sepal_width_(cm)': 3.5}, {'petal_length_(cm)': 1.4}, {'petal_width_(cm)': 0.2} ] - response = model.predict({"inputs": [request, request]}) + response, response_headers = model.predict({"inputs": [request, request]}) assert numpy.argmax(response["predictions"][0]) == 0 - response = model.predict({"instances": [request, request]}) + response, response_headers = model.predict({"instances": [request, request]}) assert numpy.argmax(response["predictions"][0]) == 0 # test v2 handler infer_input = InferInput(name="input-0", shape=[2, 4], datatype="FP32", data=[[6.8, 2.8, 4.8, 1.6], [6.0, 3.4, 4.5, 1.6]]) infer_request = InferRequest(model_name="model", infer_inputs=[infer_input]) - infer_response = model.predict(infer_request) + infer_response, response_headers = model.predict(infer_request) assert infer_response.to_rest()["outputs"] == \ [{'name': 'output-0', 'shape': [2, 3], 'datatype': 'FP64', 'data': [3.7899802486733807e-06, 0.9996982074114203, 0.00029800260833088297, diff --git a/python/paddleserver/paddleserver/model.py b/python/paddleserver/paddleserver/model.py index cd40f58d49c..bd6d243e989 100644 --- a/python/paddleserver/paddleserver/model.py +++ b/python/paddleserver/paddleserver/model.py @@ -65,12 +65,13 @@ def get_model_files(ext: str) -> str: return self.ready def predict(self, payload: Union[Dict, InferRequest], headers: Dict[str, str] = None) -> Union[Dict, InferResponse]: + response_headers = {} try: instances = get_predict_input(payload) np_array_input = np.array(instances, dtype='float32') self.input_tensor.copy_from_cpu(np_array_input) self.predictor.run() result = self.output_tensor.copy_to_cpu() - return get_predict_response(payload, result, self.name) + return get_predict_response(payload, result, self.name), response_headers except Exception as e: raise InferenceError(str(e)) diff --git a/python/paddleserver/paddleserver/test_model.py b/python/paddleserver/paddleserver/test_model.py index 74494fc3d55..fb3ff41d045 100644 --- a/python/paddleserver/paddleserver/test_model.py +++ b/python/paddleserver/paddleserver/test_model.py @@ -46,7 +46,7 @@ def test_model(): def test_img(filename: str, expected: int): img = cv2.imread(os.path.join(model_dir, filename)) request = {"instances": face_detect_preprocess(img)} - response = server.predict(request) + response, response_headers = server.predict(request) faces = response["predictions"] assert sum(face[1] > 0.5 for face in faces) == expected diff --git a/python/pmmlserver/pmmlserver/model.py b/python/pmmlserver/pmmlserver/model.py index eadc4de229f..67eba7ffb5b 100644 --- a/python/pmmlserver/pmmlserver/model.py +++ b/python/pmmlserver/pmmlserver/model.py @@ -60,10 +60,11 @@ def load(self) -> bool: return self.ready def predict(self, payload: Union[Dict, InferRequest], headers: Dict[str, str] = None) -> Union[Dict, InferResponse]: + response_headers = {} try: instances = get_predict_input(payload) results = [self.evaluator.evaluate( dict(zip(self.input_fields, instance))) for instance in instances] - return get_predict_response(payload, pd.DataFrame(results), self.name) + return get_predict_response(payload, pd.DataFrame(results), self.name), response_headers except Exception as e: raise InferenceError(str(e)) diff --git a/python/pmmlserver/pmmlserver/test_model.py b/python/pmmlserver/pmmlserver/test_model.py index a7e1f6705cc..696ced550f6 100644 --- a/python/pmmlserver/pmmlserver/test_model.py +++ b/python/pmmlserver/pmmlserver/test_model.py @@ -25,7 +25,7 @@ def test_model(): server.load() request = {"instances": [[5.1, 3.5, 1.4, 0.2]]} - response = server.predict(request) + response, response_headers = server.predict(request) expect_result = [{'Species': 'setosa', 'Probability_setosa': 1.0, 'Probability_versicolor': 0.0, @@ -42,7 +42,7 @@ def test_model_v2(): infer_input = InferInput(name="input-0", shape=[1, 4], datatype="FP32", data=[[5.1, 3.5, 1.4, 0.2]]) request = InferRequest(model_name="model", infer_inputs=[infer_input]) - response = server.predict(request) + response, response_headers = server.predict(request) expect_result = [ {'name': 'Species', 'shape': [1], 'datatype': 'BYTES', 'data': ['setosa']}, {'name': 'Probability_setosa', 'shape': [1], 'datatype': 'FP64', 'data': [1.0]}, diff --git a/python/sklearnserver/sklearnserver/model.py b/python/sklearnserver/sklearnserver/model.py index ff774b45521..d657c145063 100644 --- a/python/sklearnserver/sklearnserver/model.py +++ b/python/sklearnserver/sklearnserver/model.py @@ -52,6 +52,7 @@ def load(self) -> bool: return self.ready def predict(self, payload: Union[Dict, InferRequest], headers: Dict[str, str] = None) -> Union[Dict, InferResponse]: + response_headers = {'my-header': 'sample'} try: instances = get_predict_input(payload) if os.environ.get(ENV_PREDICT_PROBA, "false").lower() == "true" and \ @@ -59,6 +60,6 @@ def predict(self, payload: Union[Dict, InferRequest], headers: Dict[str, str] = result = self._model.predict_proba(instances) else: result = self._model.predict(instances) - return get_predict_response(payload, result, self.name) + return get_predict_response(payload, result, self.name), response_headers except Exception as e: raise InferenceError(str(e)) diff --git a/python/sklearnserver/sklearnserver/test_model.py b/python/sklearnserver/sklearnserver/test_model.py index dc2f4d3b9b5..230e6972e00 100644 --- a/python/sklearnserver/sklearnserver/test_model.py +++ b/python/sklearnserver/sklearnserver/test_model.py @@ -45,7 +45,7 @@ def _run_pickle_model(model_dir, model_name): model = SKLearnModel("model", model_dir) model.load() request = data[0:1].tolist() - response = model.predict({"instances": request}) + response, headers = model.predict({"instances": request}) assert response["predictions"] == [0] @@ -56,7 +56,7 @@ def test_model_joblib(): model = SKLearnModel("model", JOBLIB_FILE[0]) model.load() request = data[0:1].tolist() - response = model.predict({"instances": request}) + response, headers = model.predict({"instances": request}) assert response["predictions"] == [0] @@ -66,7 +66,7 @@ def test_mixedtype_model_joblib(): request = [{'MSZoning': ['RL'], 'LotArea': [8450], 'LotShape': ['Reg'], 'Utilities': ['AllPub'], 'YrSold': [2008], 'Neighborhood': ['CollgCr'], 'OverallQual': [7], 'YearBuilt': [2003], 'SaleType': ['WD'], 'GarageArea': [548]}] - response = model.predict({"instances": request}) + response, headers = model.predict({"instances": request}) assert response["predictions"] == [12.202832815138274] diff --git a/python/test_resources/graph/success_200_isvc/model.py b/python/test_resources/graph/success_200_isvc/model.py index bf2a4d6fe7f..0ec88cf15ce 100644 --- a/python/test_resources/graph/success_200_isvc/model.py +++ b/python/test_resources/graph/success_200_isvc/model.py @@ -32,7 +32,8 @@ def load(self): self.ready = True def predict(self, payload: Union[Dict, InferRequest, ModelInferRequest], headers) -> Dict: - return {"message": "SUCCESS"} + response_headers = {} + return {"message": "SUCCESS"}, response_headers parser = argparse.ArgumentParser(parents=[kserve.model_server.parser]) diff --git a/python/xgbserver/xgbserver/model.py b/python/xgbserver/xgbserver/model.py index 1c4c5ce1334..6bfff5915a4 100644 --- a/python/xgbserver/xgbserver/model.py +++ b/python/xgbserver/xgbserver/model.py @@ -58,11 +58,12 @@ def load(self) -> bool: return self.ready def predict(self, payload: Union[Dict, InferRequest], headers: Dict[str, str] = None) -> Union[Dict, InferResponse]: + response_headers = {} try: # Use of list as input is deprecated see https://github.com/dmlc/xgboost/pull/3970 instances = get_predict_input(payload) dmatrix = xgb.DMatrix(instances, nthread=self.nthread) result = self._booster.predict(dmatrix) - return get_predict_response(payload, result, self.name) + return get_predict_response(payload, result, self.name), response_headers except Exception as e: raise InferenceError(str(e)) diff --git a/python/xgbserver/xgbserver/test_model.py b/python/xgbserver/xgbserver/test_model.py index 1129aa12b43..87ae3aca735 100644 --- a/python/xgbserver/xgbserver/test_model.py +++ b/python/xgbserver/xgbserver/test_model.py @@ -39,5 +39,5 @@ def test_model(): model = XGBoostModel("model", model_dir, NTHREAD) model.load() request = [X[0].tolist()] - response = model.predict({"instances": request}) + response, response_headers = model.predict({"instances": request}) assert response["predictions"] == [0] diff --git a/test/e2e/common/utils.py b/test/e2e/common/utils.py index cfa990b83e2..5ae9efa0f56 100644 --- a/test/e2e/common/utils.py +++ b/test/e2e/common/utils.py @@ -61,7 +61,7 @@ def grpc_stub(service_name, namespace): def predict(service_name, input_json, protocol_version="v1", - version=constants.KSERVE_V1BETA1_VERSION, model_name=None): + version=constants.KSERVE_V1BETA1_VERSION, model_name=None, return_response_headers=False): with open(input_json) as json_file: data = json.load(json_file) @@ -69,11 +69,11 @@ def predict(service_name, input_json, protocol_version="v1", input_json=json.dumps(data), protocol_version=protocol_version, version=version, - model_name=model_name) + model_name=model_name, return_response_headers=return_response_headers) def predict_str(service_name, input_json, protocol_version="v1", - version=constants.KSERVE_V1BETA1_VERSION, model_name=None): + version=constants.KSERVE_V1BETA1_VERSION, model_name=None, return_response_headers=False): kfs_client = KServeClient( config_file=os.environ.get("KUBECONFIG", "~/.kube/config")) isvc = kfs_client.get( @@ -97,9 +97,12 @@ def predict_str(service_name, input_json, protocol_version="v1", logging.info("Sending url = %s", url) logging.info("Sending request data: %s", input_json) response = requests.post(url, input_json, headers=headers) - logging.info("Got response code %s, content %s", response.status_code, response.content) + logging.info("Got response code %s, content %s", + response.status_code, response.content) if response.status_code == 200: preds = json.loads(response.content.decode("utf-8")) + if return_response_headers: + return preds, response.headers return preds else: response.raise_for_status() @@ -126,7 +129,8 @@ def predict_ig(ig_name, input_json, protocol_version="v1", logging.info("Sending url = %s", url) logging.info("Sending request data: %s", input_json) response = requests.post(url, data, headers=headers) - logging.info("Got response code %s, content %s", response.status_code, response.content) + logging.info("Got response code %s, content %s", + response.status_code, response.content) if response.status_code == 200: preds = json.loads(response.content.decode("utf-8")) return preds @@ -134,16 +138,20 @@ def predict_ig(ig_name, input_json, protocol_version="v1", response.raise_for_status() -def explain(service_name, input_json): - return explain_response(service_name, input_json)["data"]["precision"] +def explain(service_name, input_json, return_response_headers=False): + + if return_response_headers: + return explain_response(service_name, input_json, return_response_headers) + else: + return explain_response(service_name, input_json, return_response_headers)["data"]["precision"] -def explain_art(service_name, input_json): +def explain_art(service_name, input_json, return_response_headers=False): return explain_response( - service_name, input_json)["explanations"]["adversarial_prediction"] + service_name, input_json, return_response_headers)["explanations"]["adversarial_prediction"] -def explain_response(service_name, input_json): +def explain_response(service_name, input_json, return_response_headers): kfs_client = KServeClient( config_file=os.environ.get("KUBECONFIG", "~/.kube/config")) isvc = kfs_client.get( @@ -168,6 +176,8 @@ def explain_response(service_name, input_json): ) if response.status_code == 200: json_response = json.loads(response.content.decode("utf-8")) + if return_response_headers: + return json_response, response.headers else: response.raise_for_status() except (RuntimeError, json.decoder.JSONDecodeError) as e: diff --git a/test/e2e/predictor/test_response_headers.py b/test/e2e/predictor/test_response_headers.py new file mode 100644 index 00000000000..e67b306bf4a --- /dev/null +++ b/test/e2e/predictor/test_response_headers.py @@ -0,0 +1,119 @@ +import logging +import os + +import pytest +from kubernetes import client +from kubernetes.client import V1ResourceRequirements + +from kserve import (KServeClient, V1beta1InferenceService, + V1beta1InferenceServiceSpec, V1beta1ModelFormat, + V1beta1ModelSpec, V1beta1PredictorSpec, V1beta1SKLearnSpec, + constants, V1beta1ExplainerSpec, V1beta1AlibiExplainerSpec) + + +from ..common.utils import KSERVE_TEST_NAMESPACE, explain, predict + + +@pytest.mark.slow +def test_predictor_response_headers(): + service_name = "isvc-sklearn-v2" + + predictor = V1beta1PredictorSpec( + min_replicas=1, + model=V1beta1ModelSpec( + model_format=V1beta1ModelFormat( + name="sklearn", + ), + runtime="kserve-sklearnserver", + image="andyi2it/sklearn-headers:latest", + storage_uri="gs://seldon-models/sklearn/mms/lr_model", + resources=V1ResourceRequirements( + requests={"cpu": "50m", "memory": "128Mi"}, + limits={"cpu": "100m", "memory": "512Mi"}, + ), + ), + ) + + isvc = V1beta1InferenceService( + api_version=constants.KSERVE_V1BETA1, + kind=constants.KSERVE_KIND, + metadata=client.V1ObjectMeta( + name=service_name, namespace=KSERVE_TEST_NAMESPACE + ), + spec=V1beta1InferenceServiceSpec(predictor=predictor), + ) + + kserve_client = KServeClient( + config_file=os.environ.get("KUBECONFIG", "~/.kube/config")) + kserve_client.create(isvc) + kserve_client.wait_isvc_ready( + service_name, namespace=KSERVE_TEST_NAMESPACE) + + response_content, response_headers = predict( + service_name, "./data/iris_input_v2.json", protocol_version="v2", return_response_headers=True) + + assert "my-header" in response_headers + assert response_headers["my-header"] == "sample" + assert response_content["outputs"][0]["data"] == [1, 1] + + kserve_client.delete(service_name, KSERVE_TEST_NAMESPACE) + + +@pytest.mark.slow +def test_explainer_response_headers(): + service_name = 'isvc-explainer-tabular' + predictor = V1beta1PredictorSpec( + sklearn=V1beta1SKLearnSpec( + image="andyi2it/sklearn-headers:latest", + storage_uri='gs://kfserving-examples/models/sklearn/1.3/income/model', + resources=V1ResourceRequirements( + requests={'cpu': '100m', 'memory': '256Mi'}, + limits={'cpu': '250m', 'memory': '512Mi'} + ) + ) + ) + explainer = V1beta1ExplainerSpec( + min_replicas=1, + alibi=V1beta1AlibiExplainerSpec( + image="andyi2it/alibiexplainer-headers:latest", + name='kserve-container', + type='AnchorTabular', + storage_uri='gs://kfserving-examples/models/sklearn/1.3/income/explainer', + resources=V1ResourceRequirements( + requests={'cpu': '100m', 'memory': '256Mi'}, + limits={'cpu': '250m', 'memory': '512Mi'} + ) + ) + ) + + isvc = V1beta1InferenceService(api_version=constants.KSERVE_V1BETA1, + kind=constants.KSERVE_KIND, + metadata=client.V1ObjectMeta( + name=service_name, namespace=KSERVE_TEST_NAMESPACE), + spec=V1beta1InferenceServiceSpec(predictor=predictor, explainer=explainer)) + + kserve_client = KServeClient( + config_file=os.environ.get("KUBECONFIG", "~/.kube/config")) + kserve_client.create(isvc) + try: + kserve_client.wait_isvc_ready( + service_name, namespace=KSERVE_TEST_NAMESPACE, timeout_seconds=720) + except RuntimeError as e: + logging.info(kserve_client.api_instance.get_namespaced_custom_object("serving.knative.dev", "v1", + KSERVE_TEST_NAMESPACE, "services", + service_name + "-predictor-default")) + pods = kserve_client.core_api.list_namespaced_pod(KSERVE_TEST_NAMESPACE, + label_selector='serving.kserve.io/inferenceservice={}'.format( + service_name)) + for pod in pods.items: + logging.info(pod) + raise e + + response_content = predict(service_name, './data/income_input.json') + assert (response_content["predictions"] == [0]) + precision, response_headers = explain( + service_name, './data/income_input.json', return_response_headers=True) + assert "my-header" in response_headers + assert response_headers["my-header"] == "sample" + assert (precision["data"]["precision"] > 0.9) + kserve_client.delete(service_name, KSERVE_TEST_NAMESPACE)