From 4c41189c915a9d108cd048f1e887b551c3d7b528 Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Thu, 17 Oct 2024 09:55:50 +0300 Subject: [PATCH] fix: validate REST Proxy subscription param types --- src/karapace/kafka_rest_apis/consumer_manager.py | 14 ++++++++++++++ tests/integration/test_rest_consumer.py | 10 ++++++++++ 2 files changed, 24 insertions(+) diff --git a/src/karapace/kafka_rest_apis/consumer_manager.py b/src/karapace/kafka_rest_apis/consumer_manager.py index b02902e3d..809478f4c 100644 --- a/src/karapace/kafka_rest_apis/consumer_manager.py +++ b/src/karapace/kafka_rest_apis/consumer_manager.py @@ -150,6 +150,16 @@ def _illegal_state_fail(message: str, content_type: str) -> None: message=message, ) + @staticmethod + def _unprocessable_entity(*, message: str, content_type: str) -> None: + ConsumerManager._assert( + cond=False, + code=HTTPStatus.UNPROCESSABLE_ENTITY, + sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value, + content_type=content_type, + message=message, + ) + # external api below # CONSUMER async def create_consumer(self, group_name: str, request_data: dict, content_type: str): @@ -318,7 +328,11 @@ async def set_subscription(self, internal_name: tuple[str, str], content_type: s LOG.info("Updating subscription for %s", internal_name) self._assert_consumer_exists(internal_name, content_type) topics = request_data.get("topics", []) + if topics and not isinstance(topics, list): + self._unprocessable_entity(message="Topics is expected to be list of strings", content_type=content_type) topics_pattern = request_data.get("topic_pattern") + if topics_pattern and not isinstance(topics_pattern, str): + self._unprocessable_entity(message="Topic patterns is expected to be a string", content_type=content_type) if not (topics or topics_pattern): self._illegal_state_fail( message="Neither topic_pattern nor topics are present in request", content_type=content_type diff --git a/tests/integration/test_rest_consumer.py b/tests/integration/test_rest_consumer.py index 1c5f6083a..f0003dbdd 100644 --- a/tests/integration/test_rest_consumer.py +++ b/tests/integration/test_rest_consumer.py @@ -167,6 +167,16 @@ async def test_subscription(rest_async_client, admin_client, producer, trail): res = await rest_async_client.post(assign_path, headers=REST_HEADERS["json"], json=assign_payload) assert res.status_code == 409, "Expecting status code 409 on assign after subscribe on the same consumer instance" + # topics parameter is expected to be array, 4xx error returned + res = await rest_async_client.post(sub_path, json={"topics": topic_name}, headers=REST_HEADERS["json"]) + assert res.status_code == 422, "Expecting status code 422 on subscription update with invalid topics param" + + # topic pattern parameter is expected to be a string, 4xx error returned + res = await rest_async_client.post( + sub_path, json={"topic_pattern": ["not", "a", "string"]}, headers=REST_HEADERS["json"] + ) + assert res.status_code == 422, "Expecting status code 422 on subscription update with invalid topics param" + @pytest.mark.parametrize("trail", ["", "/"]) async def test_seek(rest_async_client, admin_client, trail):