Skip to content

Commit

Permalink
kafka: handle errors on ACL management methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed Feb 28, 2024
1 parent e001374 commit eecb2d3
Showing 1 changed file with 78 additions and 45 deletions.
123 changes: 78 additions & 45 deletions hstream-kafka/HStream/Kafka/Server/Handler/Security.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,29 @@ module HStream.Kafka.Server.Handler.Security
, handleDeleteAcls
) where

import qualified Data.Vector as V

import HStream.Kafka.Server.Security.SASL (serverSupportedMechanismNames)
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

import qualified Control.Exception as E
import Control.Monad
import Data.Function (on)
import qualified Data.List as L
import Data.Maybe
import qualified Data.Text as T
import qualified Data.Vector as V

import HStream.Kafka.Common.Acl
import HStream.Kafka.Common.Authorizer
import HStream.Kafka.Common.Authorizer.Class
import qualified HStream.Kafka.Common.KafkaException as K
import HStream.Kafka.Common.Resource
import HStream.Kafka.Common.Security
-------------------------------------------------------------------------------
import HStream.Kafka.Server.Security.SASL (serverSupportedMechanismNames)
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

-------------------------------------------------------------------------------
handleSaslHandshake :: ServerContext -> K.RequestContext -> K.SaslHandshakeRequest -> IO K.SaslHandshakeResponse
handleSaslHandshake _ _ K.SaslHandshakeRequest{..} = do
-- isLibSupported <- runSASL (serverSupports reqMechanism)
Expand Down Expand Up @@ -58,17 +59,19 @@ handleSaslAuthenticate _ _ _ = do
mempty

-------------------------------------------------------------------------------

toAuthorizableReqCtx :: K.RequestContext -> AuthorizableRequestContext
toAuthorizableReqCtx reqCtx =
AuthorizableRequestContext (T.pack reqCtx.clientHost)
(Principal "User" (fromMaybe "" (join reqCtx.clientId)))

-- FIXME: handle error
-- FIXME: error granularity?
handleDescribeAcls :: ServerContext
-> K.RequestContext
-> K.DescribeAclsRequest
-> IO K.DescribeAclsResponse
handleDescribeAcls ctx reqCtx req = do
handleDescribeAcls ctx reqCtx req =
flip E.catches [ E.Handler (\(e :: K.ErrorCodeException) -> do
let (K.ErrorCodeException code) = e
return $ makeErrorResp code (T.pack . show $ e))
, E.Handler (\(e :: E.SomeException) ->
return $ makeErrorResp K.UNKNOWN_SERVER_ERROR (T.pack . show $ e))
] $ do
let aclBindingFilter =
AclBindingFilter
(ResourcePatternFilter (toEnum . fromIntegral $ req.resourceTypeFilter)
Expand All @@ -80,7 +83,7 @@ handleDescribeAcls ctx reqCtx req = do
(toEnum . fromIntegral $ req.operation)
(toEnum . fromIntegral $ req.permissionType)))
let authCtx = toAuthorizableReqCtx reqCtx
aclBindings <- (withAuthorizerObject ctx.authorizer (getAcls authCtx)) aclBindingFilter
aclBindings <- getAcls authCtx ctx.authorizer aclBindingFilter
let xss = L.groupBy ((==) `on` aclBindingResourcePattern) aclBindings
let ress = K.KaArray (Just (V.fromList (aclBindingsToDescribeAclsResource <$> xss)))
return K.DescribeAclsResponse
Expand All @@ -89,41 +92,45 @@ handleDescribeAcls ctx reqCtx req = do
, errorMessage = Just ""
, resources = ress
}
where
makeErrorResp :: K.ErrorCode -> T.Text -> K.DescribeAclsResponse
makeErrorResp code msg =
K.DescribeAclsResponse 0 code (Just msg) (K.KaArray (Just mempty))


----
aclCreationToAclBinding :: K.AclCreation -> AclBinding
aclCreationToAclBinding x =
AclBinding (ResourcePattern (toEnum . fromIntegral $ x.resourceType)
x.resourceName
Pat_LITERAL)
(AccessControlEntry
(AccessControlEntryData x.principal
x.host
(toEnum . fromIntegral $ x.operation)
(toEnum . fromIntegral $ x.permissionType)))

-- FIXME: handle error properly
-- FIXME: error granularity?
handleCreateAcls :: ServerContext
-> K.RequestContext
-> K.CreateAclsRequest
-> IO K.CreateAclsResponse
handleCreateAcls ctx reqCtx req = do
let authCtx = toAuthorizableReqCtx reqCtx
let aclBindings = aclCreationToAclBinding <$> maybe [] V.toList (K.unKaArray req.creations)
(withAuthorizerObject ctx.authorizer (createAcls authCtx)) aclBindings
createAcls authCtx ctx.authorizer aclBindings `E.catches`
[ E.Handler (\(e :: K.ErrorCodeException) -> do
let (K.ErrorCodeException code) = e
return $ makeErrorResp (length aclBindings) code (T.pack . show $ e))
, E.Handler (\(e :: E.SomeException) ->
return $ makeErrorResp (length aclBindings) K.UNKNOWN_SERVER_ERROR (T.pack . show $ e))
]
where
makeErrorResp :: Int -> K.ErrorCode -> T.Text -> K.CreateAclsResponse
makeErrorResp len code msg =
K.CreateAclsResponse 0 (K.KaArray (Just (V.replicate len (K.AclCreationResult code (Just msg)))))

--
deleteAclsFilterToAclBindingFilter :: K.DeleteAclsFilter -> AclBindingFilter
deleteAclsFilterToAclBindingFilter x =
AclBindingFilter (ResourcePatternFilter (toEnum (fromIntegral x.resourceTypeFilter))
(fromMaybe "" x.resourceNameFilter)
Pat_LITERAL)
(AccessControlEntryFilter
(AccessControlEntryData (fromMaybe "" x.principalFilter)
(fromMaybe "" x.hostFilter)
(toEnum (fromIntegral x.operation))
(toEnum (fromIntegral x.permissionType))))
aclCreationToAclBinding :: K.AclCreation -> AclBinding
aclCreationToAclBinding x =
AclBinding (ResourcePattern (toEnum . fromIntegral $ x.resourceType)
x.resourceName
Pat_LITERAL)
(AccessControlEntry
(AccessControlEntryData x.principal
x.host
(toEnum . fromIntegral $ x.operation)
(toEnum . fromIntegral $ x.permissionType)))

-- FIXME: handle error properly
-- FIXME: error granularity?
handleDeleteAcls :: ServerContext
-> K.RequestContext
-> K.DeleteAclsRequest
Expand All @@ -132,4 +139,30 @@ handleDeleteAcls ctx reqCtx req = do
let authCtx = toAuthorizableReqCtx reqCtx
let filters = maybe [] (fmap deleteAclsFilterToAclBindingFilter . V.toList)
(K.unKaArray req.filters)
(withAuthorizerObject ctx.authorizer (deleteAcls authCtx)) filters
deleteAcls authCtx ctx.authorizer filters `E.catches`
[ E.Handler (\(e :: K.ErrorCodeException) -> do
let (K.ErrorCodeException code) = e
return $ makeErrorResp (length filters) code (T.pack . show $ e))
, E.Handler (\(e :: E.SomeException) ->
return $ makeErrorResp (length filters) K.UNKNOWN_SERVER_ERROR (T.pack . show $ e))
]
where
makeErrorResp :: Int -> K.ErrorCode -> T.Text -> K.DeleteAclsResponse
makeErrorResp len code msg =
K.DeleteAclsResponse 0 (K.KaArray (Just (V.replicate len (K.DeleteAclsFilterResult code (Just msg) (K.KaArray (Just mempty))))))

deleteAclsFilterToAclBindingFilter :: K.DeleteAclsFilter -> AclBindingFilter
deleteAclsFilterToAclBindingFilter x =
AclBindingFilter (ResourcePatternFilter (toEnum (fromIntegral x.resourceTypeFilter))
(fromMaybe "" x.resourceNameFilter)
Pat_LITERAL)
(AccessControlEntryFilter
(AccessControlEntryData (fromMaybe "" x.principalFilter)
(fromMaybe "" x.hostFilter)
(toEnum (fromIntegral x.operation))
(toEnum (fromIntegral x.permissionType))))

toAuthorizableReqCtx :: K.RequestContext -> AuthorizableRequestContext
toAuthorizableReqCtx reqCtx =
AuthorizableRequestContext (T.pack reqCtx.clientHost)
(Principal "User" (fromMaybe "" (join reqCtx.clientId)))

0 comments on commit eecb2d3

Please sign in to comment.