Skip to content

Commit

Permalink
Merge pull request Aiven-Open#558 from aiven/misc-error-handling
Browse files Browse the repository at this point in the history
Miscellaneous error handling fixes
  • Loading branch information
jjaakola-aiven authored Mar 13, 2023
2 parents 88bee3a + 90a360a commit 8727294
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 8 deletions.
12 changes: 11 additions & 1 deletion karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
UnknownTopicOrPartitionError,
)
from karapace.config import Config, create_client_ssl_context
from karapace.errors import InvalidSchema
from karapace.kafka_rest_apis.admin import KafkaRestAdminClient
from karapace.kafka_rest_apis.consumer_manager import ConsumerManager
from karapace.kafka_rest_apis.error_codes import RESTErrorCodes
Expand Down Expand Up @@ -779,11 +780,20 @@ async def validate_schema_info(self, data: dict, prefix: str, content_type: str,
KafkaRest.r(
body={
"error_code": RESTErrorCodes.SCHEMA_RETRIEVAL_ERROR.value,
"message": f"Error when registering schema. format = {schema_type}, subject = {topic}-{prefix}",
"message": f"Error when registering schema. format = {schema_type.value}, subject = {topic}-{prefix}",
},
content_type=content_type,
status=HTTPStatus.REQUEST_TIMEOUT,
)
except InvalidSchema:
KafkaRest.r(
body={
"error_code": RESTErrorCodes.INVALID_DATA.value,
"message": f'Invalid schema. format = {schema_type.value}, schema = {data[f"{prefix}_schema"]}',
},
content_type=content_type,
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)

async def _prepare_records(
self,
Expand Down
4 changes: 3 additions & 1 deletion karapace/rapu.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ def check_rest_headers(self, request: HTTPRequest) -> dict:
result["requests"] = header_info
result["accepts"] = accept_matcher.groupdict()
return result
self.log.error("Not acceptable: %r", request.get_header("accept"))
http_error(
message="HTTP 406 Not Acceptable",
content_type=result["content_type"],
Expand Down Expand Up @@ -331,6 +330,9 @@ async def _handle_request(
data = ex.body
status = ex.status
headers = ex.headers
except asyncio.CancelledError:
# Re-raise if aiohttp cancelled the task (e.g. client disconnected) without internal server error
raise
except: # pylint: disable=bare-except
self.log.exception("Internal server error")
headers = {"Content-Type": "application/json"}
Expand Down
16 changes: 13 additions & 3 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

@unique
class SchemaErrorCodes(Enum):
HTTP_BAD_REQUEST = HTTPStatus.BAD_REQUEST.value
HTTP_NOT_FOUND = HTTPStatus.NOT_FOUND.value
HTTP_CONFLICT = HTTPStatus.CONFLICT.value
HTTP_UNPROCESSABLE_ENTITY = HTTPStatus.UNPROCESSABLE_ENTITY.value
Expand Down Expand Up @@ -835,11 +836,11 @@ def _validate_schema_request_body(self, content_type: str, body: Union[dict, Any
if not isinstance(body, dict):
self.r(
body={
"error_code": SchemaErrorCodes.HTTP_INTERNAL_SERVER_ERROR.value,
"message": "Internal Server Error",
"error_code": SchemaErrorCodes.HTTP_BAD_REQUEST.value,
"message": "Malformed request",
},
content_type=content_type,
status=HTTPStatus.INTERNAL_SERVER_ERROR,
status=HTTPStatus.BAD_REQUEST,
)
for attr in body:
if attr not in {"schema", "schemaType"}:
Expand All @@ -853,6 +854,15 @@ def _validate_schema_request_body(self, content_type: str, body: Union[dict, Any
)

def _validate_schema_type(self, content_type: str, data: JsonData) -> SchemaType:
if not isinstance(data, dict):
self.r(
body={
"error_code": SchemaErrorCodes.HTTP_BAD_REQUEST.value,
"message": "Malformed request",
},
content_type=content_type,
status=HTTPStatus.BAD_REQUEST,
)
schema_type_unparsed = data.get("schemaType", SchemaType.AVRO.value)
try:
schema_type = SchemaType(schema_type_unparsed)
Expand Down
12 changes: 9 additions & 3 deletions tests/integration/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,12 @@ async def test_compatibility_endpoint(registry_async_client: Client, trail: str)
subject = create_subject_name_factory(f"test_compatibility_endpoint-{trail}")()
schema_name = create_schema_name_factory(f"test_compatibility_endpoint_{trail}")()

res = await registry_async_client.post(
f"subjects/{subject}/versions{trail}",
json=-1,
)
assert res.status_code == 400

schema = {
"type": "record",
"name": schema_name,
Expand Down Expand Up @@ -2220,9 +2226,9 @@ async def test_schema_body_validation(registry_async_client: Client) -> None:
assert res.json()["message"] == "Unrecognized field: invalid_field"
# Invalid body type
res = await registry_async_client.post(endpoint, json="invalid")
assert res.status_code == 500
assert res.json()["error_code"] == 500
assert res.json()["message"] == "Internal Server Error"
assert res.status_code == 400
assert res.json()["error_code"] == 400
assert res.json()["message"] == "Malformed request"


async def test_version_number_validation(registry_async_client: Client) -> None:
Expand Down

0 comments on commit 8727294

Please sign in to comment.