diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index 82cbef558..2e44065b9 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -13,6 +13,7 @@ UnknownTopicOrPartitionError, ) from karapace.config import Config, create_client_ssl_context +from karapace.errors import InvalidSchema from karapace.kafka_rest_apis.admin import KafkaRestAdminClient from karapace.kafka_rest_apis.consumer_manager import ConsumerManager from karapace.kafka_rest_apis.error_codes import RESTErrorCodes @@ -779,11 +780,20 @@ async def validate_schema_info(self, data: dict, prefix: str, content_type: str, KafkaRest.r( body={ "error_code": RESTErrorCodes.SCHEMA_RETRIEVAL_ERROR.value, - "message": f"Error when registering schema. format = {schema_type}, subject = {topic}-{prefix}", + "message": f"Error when registering schema. format = {schema_type.value}, subject = {topic}-{prefix}", }, content_type=content_type, status=HTTPStatus.REQUEST_TIMEOUT, ) + except InvalidSchema: + KafkaRest.r( + body={ + "error_code": RESTErrorCodes.INVALID_DATA.value, + "message": f'Invalid schema. format = {schema_type.value}, schema = {data[f"{prefix}_schema"]}', + }, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) async def _prepare_records( self, diff --git a/karapace/rapu.py b/karapace/rapu.py index 4cd81fe59..a791e498e 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -213,7 +213,6 @@ def check_rest_headers(self, request: HTTPRequest) -> dict: result["requests"] = header_info result["accepts"] = accept_matcher.groupdict() return result - self.log.error("Not acceptable: %r", request.get_header("accept")) http_error( message="HTTP 406 Not Acceptable", content_type=result["content_type"], @@ -331,6 +330,9 @@ async def _handle_request( data = ex.body status = ex.status headers = ex.headers + except asyncio.CancelledError: + # Re-raise if aiohttp cancelled the task (e.g. client disconnected) without internal server error + raise except: # pylint: disable=bare-except self.log.exception("Internal server error") headers = {"Content-Type": "application/json"} diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 98f6fdae6..1532b7838 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -38,6 +38,7 @@ @unique class SchemaErrorCodes(Enum): + HTTP_BAD_REQUEST = HTTPStatus.BAD_REQUEST.value HTTP_NOT_FOUND = HTTPStatus.NOT_FOUND.value HTTP_CONFLICT = HTTPStatus.CONFLICT.value HTTP_UNPROCESSABLE_ENTITY = HTTPStatus.UNPROCESSABLE_ENTITY.value @@ -835,11 +836,11 @@ def _validate_schema_request_body(self, content_type: str, body: Union[dict, Any if not isinstance(body, dict): self.r( body={ - "error_code": SchemaErrorCodes.HTTP_INTERNAL_SERVER_ERROR.value, - "message": "Internal Server Error", + "error_code": SchemaErrorCodes.HTTP_BAD_REQUEST.value, + "message": "Malformed request", }, content_type=content_type, - status=HTTPStatus.INTERNAL_SERVER_ERROR, + status=HTTPStatus.BAD_REQUEST, ) for attr in body: if attr not in {"schema", "schemaType"}: @@ -853,6 +854,15 @@ def _validate_schema_request_body(self, content_type: str, body: Union[dict, Any ) def _validate_schema_type(self, content_type: str, data: JsonData) -> SchemaType: + if not isinstance(data, dict): + self.r( + body={ + "error_code": SchemaErrorCodes.HTTP_BAD_REQUEST.value, + "message": "Malformed request", + }, + content_type=content_type, + status=HTTPStatus.BAD_REQUEST, + ) schema_type_unparsed = data.get("schemaType", SchemaType.AVRO.value) try: schema_type = SchemaType(schema_type_unparsed) diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 4e38f7196..f39b4c929 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -288,6 +288,12 @@ async def test_compatibility_endpoint(registry_async_client: Client, trail: str) subject = create_subject_name_factory(f"test_compatibility_endpoint-{trail}")() schema_name = create_schema_name_factory(f"test_compatibility_endpoint_{trail}")() + res = await registry_async_client.post( + f"subjects/{subject}/versions{trail}", + json=-1, + ) + assert res.status_code == 400 + schema = { "type": "record", "name": schema_name, @@ -2220,9 +2226,9 @@ async def test_schema_body_validation(registry_async_client: Client) -> None: assert res.json()["message"] == "Unrecognized field: invalid_field" # Invalid body type res = await registry_async_client.post(endpoint, json="invalid") - assert res.status_code == 500 - assert res.json()["error_code"] == 500 - assert res.json()["message"] == "Internal Server Error" + assert res.status_code == 400 + assert res.json()["error_code"] == 400 + assert res.json()["message"] == "Malformed request" async def test_version_number_validation(registry_async_client: Client) -> None: