From 18948be44caa1116faf6b91086054f2de8d5e9d1 Mon Sep 17 00:00:00 2001 From: Commelina Date: Tue, 12 Mar 2024 16:24:17 +0800 Subject: [PATCH] kafka: do authorization on describe configs (32) (#1777) --- .../Kafka/Server/Config/KafkaConfigManager.hs | 12 ++++--- .../HStream/Kafka/Server/Handler/Basic.hs | 33 ++++++++++++++++--- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfigManager.hs b/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfigManager.hs index c77fe25e3..37fba270f 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfigManager.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfigManager.hs @@ -42,7 +42,7 @@ listTopicConfigs KafkaConfigManager{..} topic keys = do let keys' = fromMaybe (V.fromList $ Map.keys KC.allTopicConfigs) (K.unKaArray keys) configs' = convertConfigs configs case V.mapM (getConfig configs') keys' of - Left msg -> return $ getErrorResponse KC.TOPIC topic msg + Left msg -> return $ getErrorResponse KC.TOPIC topic K.INVALID_CONFIG msg Right configsInResp -> return $ K.DescribeConfigsResult { configs=K.NonNullKaArray configsInResp , errorCode=0 @@ -64,10 +64,14 @@ listTopicConfigs KafkaConfigManager{..} topic keys = do getConfig :: Map.Map T.Text (Maybe T.Text) -> T.Text -> Either T.Text K.DescribeConfigsResourceResult getConfig configs configName = getConfigByInstance <$> KC.getTopicConfig configName configs -getErrorResponse :: KC.KafkaConfigResource -> T.Text -> T.Text -> K.DescribeConfigsResult -getErrorResponse rt rn msg = K.DescribeConfigsResult +getErrorResponse :: KC.KafkaConfigResource + -> T.Text + -> K.ErrorCode + -> T.Text + -> K.DescribeConfigsResult +getErrorResponse rt rn code msg = K.DescribeConfigsResult { configs=K.NonNullKaArray V.empty - , errorCode=K.INVALID_CONFIG + , errorCode=code , resourceName=rn , errorMessage=Just msg , resourceType=fromIntegral . fromEnum $ rt diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs index 6421d327e..273d2c79d 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs @@ -244,22 +244,45 @@ handleMetadata ctx reqCtx req = do --------------------------------------------------------------------------- -- 32: DescribeConfigs --------------------------------------------------------------------------- +-- FIXME: This function does not catch any Kafka ErrorCodeException. +-- Is this proper? +-- FIXME: Recheck if returned error codes and messages are proper. +-- See kafka.server.ConfigHelper#handleDescribeConfigsRequest handleDescribeConfigs :: ServerContext -> K.RequestContext -> K.DescribeConfigsRequest -> IO K.DescribeConfigsResponse -handleDescribeConfigs serverCtx _ req = do +handleDescribeConfigs serverCtx reqCtx req = do manager <- KCM.mkKafkaConfigManager serverCtx.scLDClient serverCtx.kafkaBrokerConfigs results <- V.forM (Utils.kaArrayToVector req.resources) $ \resource -> do case toEnum (fromIntegral resource.resourceType) of - KC.TOPIC -> KCM.listTopicConfigs manager resource.resourceName resource.configurationKeys + KC.TOPIC -> do + -- [ACL] check [DESCRIBE_CONFIGS TOPIC] + K.simpleAuthorize (K.toAuthorizableReqCtx reqCtx) serverCtx.authorizer K.Res_TOPIC resource.resourceName K.AclOp_DESCRIBE_CONFIGS >>= \case + False -> return $ KCM.getErrorResponse KC.TOPIC + resource.resourceName + K.TOPIC_AUTHORIZATION_FAILED + "Topic authorization failed." + True -> + KCM.listTopicConfigs manager resource.resourceName resource.configurationKeys KC.BROKER -> do + -- FIXME: authorize [DESCRIBE_CONFIGS CLUSTER] first if T.pack (show serverCtx.serverID) == resource.resourceName then KCM.listBrokerConfigs manager resource.resourceName resource.configurationKeys - else return $ KCM.getErrorResponse KC.BROKER resource.resourceName ("invalid broker id:" <> resource.resourceName) - rt -> return $ KCM.getErrorResponse rt resource.resourceName ("unsupported resource type:" <> T.pack (show rt)) - return $ K.DescribeConfigsResponse {results=K.NonNullKaArray results, throttleTimeMs=0} + else return $ KCM.getErrorResponse KC.BROKER + resource.resourceName + K.INVALID_REQUEST + ("Unexpected broker id, expected " <> (T.pack (show serverCtx.serverID)) <> " but received " <> resource.resourceName) + rt -> return $ KCM.getErrorResponse rt + resource.resourceName + K.INVALID_REQUEST + ("Unexpected resource type " <> T.pack (show rt) <> " for resouce" <> resource.resourceName) + + return $ K.DescribeConfigsResponse { + results = K.NonNullKaArray results + , throttleTimeMs = 0 + } --------------------------------------------------------------------------- -- 32: FindCoordinator