Skip to content

Commit

Permalink
kafka: implement basic ACL handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed Feb 22, 2024
1 parent 3c47e4f commit 2876890
Show file tree
Hide file tree
Showing 13 changed files with 242 additions and 42 deletions.
9 changes: 9 additions & 0 deletions common/server/HStream/Common/Server/MetaData.hs
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,29 @@ kafkaZkPaths =
, textToCBytes $ myRootPath @Proto.Timestamp @ZHandle
, textToCBytes $ myRootPath @TaskAllocation @ZHandle
, textToCBytes $ myRootPath @GroupMetadataValue @ZHandle
-- FIXME: hardcoded
, textToCBytes kafkaRootPath <> "/acl"
, textToCBytes kafkaRootPath <> "/acl-extended"
]

kafkaRqTables :: [Text]
kafkaRqTables =
[ myRootPath @TaskAllocation @RHandle
, myRootPath @GroupMetadataValue @RHandle
, myRootPath @Proto.Timestamp @RHandle
-- FIXME: hardcoded
, "acl"
, "acl-extended"
]

kafkaFileTables :: [Text]
kafkaFileTables =
[ myRootPath @TaskAllocation @FHandle
, myRootPath @GroupMetadataValue @FHandle
, myRootPath @Proto.Timestamp @RHandle
-- FIXME: hardcoded
, "acl"
, "acl-extended"
]

initKafkaZkPaths :: HasCallStack => ZHandle -> IO ()
Expand Down
19 changes: 15 additions & 4 deletions hstream-kafka/HStream/Kafka/Common/AclStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ loadAllAcls a aclsConsumer = do
zkAclStorePath :: PatternType -> Text
zkAclStorePath pat =
case pat of
Pat_LITERAL -> "/kafka-acl"
Pat_PREFIXED -> "kafka-acl-extended"
-- FIXME: hardcoded
Pat_LITERAL -> "/hstream/kafka/acl"
Pat_PREFIXED -> "/hstream/kafka/acl-extended"
pat_ -> error $ "Invalid pattern type: " <> show pat_ -- FIXME: error

zkAclStorePath' :: PatternType -> ResourceType -> Text
Expand Down Expand Up @@ -97,8 +98,18 @@ instance AclStore ZHandle where
let path = zkAclStorePath'' resPat
ZK.zooExists zkHandle (Utils.textToCBytes path) >>= \case
-- FIXME: zookeeper acl
Nothing -> void $
ZK.zooCreate zkHandle (Utils.textToCBytes path) (Just (Utils.lazyByteStringToBytes (Aeson.encode node))) ZK.zooOpenAclUnsafe ZK.ZooPersistent
Nothing -> do
-- FIXME: create paths if parent not exist?
void $ ZK.zooCreateIfMissing zkHandle
(Utils.textToCBytes (zkAclStorePath' resPat.resPatPatternType resPat.resPatResourceType))
Nothing
ZK.zooOpenAclUnsafe
ZK.ZooPersistent
void $ ZK.zooCreate zkHandle
(Utils.textToCBytes path)
(Just (Utils.lazyByteStringToBytes (Aeson.encode node)))
ZK.zooOpenAclUnsafe
ZK.ZooPersistent
-- FIXME: check version
Just _ -> void $
ZK.zooSet zkHandle (Utils.textToCBytes path) (Just (Utils.lazyByteStringToBytes (Aeson.encode node))) Nothing
Expand Down
89 changes: 64 additions & 25 deletions hstream-kafka/HStream/Kafka/Common/Authorizer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ initAclAuthorizer authorizer =
atomicModifyIORef' (authorizerCache authorizer)
(\x -> (updateCache x res acls, ()))

------------------------------------------------------------
-- Class instance
------------------------------------------------------------
instance (AclStore a) => Authorizer (AclAuthorizer a) where
createAcls = aclCreateAcls
deleteAcls = aclDeleteAcls
getAcls = aclGetAcls
aclCount = aclAclCount
authorize = aclAuthorize

------------------------------------------------------------
-- Authorizer implementation
------------------------------------------------------------

-- FIXME: Does this function behave the same as Kafka?
-- e.g. List or Set?
-- | Get matching ACLs in cache for the given resource.
Expand Down Expand Up @@ -166,19 +180,19 @@ authorizeAction reqCtx authorizer action@AclAction{..} = do
) False canAllowOps

-- | Authorize a list of ACL actions based on the request context and the given ACL cache.
authorize :: AuthorizableRequestContext
-> AclAuthorizer a
-> [AclAction]
-> IO [AuthorizationResult]
authorize reqCtx authorizer actions =
aclAuthorize :: AuthorizableRequestContext
-> AclAuthorizer a
-> [AclAction]
-> IO [AuthorizationResult]
aclAuthorize reqCtx authorizer actions =
forM actions (authorizeAction reqCtx authorizer)

-- | Get ACL bindings (ACL entry with resource) in cache matching the given filter.
getAcls :: AuthorizableRequestContext
-> AclAuthorizer a
-> AclBindingFilter
-> IO [AclBinding]
getAcls _ AclAuthorizer{..} aclFilter = do
aclGetAcls :: AuthorizableRequestContext
-> AclAuthorizer a
-> AclBindingFilter
-> IO [AclBinding]
aclGetAcls _ AclAuthorizer{..} aclFilter = do
cache <- readIORef authorizerCache
return $ Map.foldrWithKey' f [] (aclCacheAcls cache)
where
Expand All @@ -192,12 +206,12 @@ getAcls _ AclAuthorizer{..} aclFilter = do

-- | Create ACLs for the given bindings.
-- It updates both the cache and the store.
createAcls :: AclStore a
=> AuthorizableRequestContext
-> AclAuthorizer a
-> [AclBinding]
-> IO K.CreateAclsResponse
createAcls _ authorizer bindings = withMVar (authorizerLock authorizer) $ \_ -> do
aclCreateAcls :: AclStore a
=> AuthorizableRequestContext
-> AclAuthorizer a
-> [AclBinding]
-> IO K.CreateAclsResponse
aclCreateAcls _ authorizer bindings = withMVar (authorizerLock authorizer) $ \_ -> do
let bindingsWithIdx = L.zip [0..] bindings
(lefts_, rights_) <- partitionEithers <$> mapM validateEachBinding bindingsWithIdx
let errorResults = Map.fromList lefts_
Expand Down Expand Up @@ -233,17 +247,17 @@ createAcls _ authorizer bindings = withMVar (authorizerLock authorizer) $ \_ ->
in (newAcls, results)
case results_e of
-- FIXME: ERROR CODE
Left (_ :: SomeException) -> return $ L.map (\(i,_) -> (i, K.AclCreationResult K.NONE (Just "Failed to update ACLs"))) bs
Left (e :: SomeException) -> return $ L.map (\(i,_) -> (i, K.AclCreationResult K.NONE (Just $ "Failed to update ACLs" <> (T.pack (show e))))) bs
Right x -> return x

-- | Delete ACls for the given filters.
-- It updates both the cache and the store.
deleteAcls :: AclStore a
=> AuthorizableRequestContext
-> AclAuthorizer a
-> [AclBindingFilter]
-> IO K.DeleteAclsResponse
deleteAcls _ authorizer filters = withMVar (authorizerLock authorizer) $ \_ -> do
aclDeleteAcls :: AclStore a
=> AuthorizableRequestContext
-> AclAuthorizer a
-> [AclBindingFilter]
-> IO K.DeleteAclsResponse
aclDeleteAcls _ authorizer filters = withMVar (authorizerLock authorizer) $ \_ -> do
AclCache{..} <- readIORef (authorizerCache authorizer)
let filtersWithIdx = L.zip [0..] filters
let possibleResources = Map.keys aclCacheAcls <>
Expand Down Expand Up @@ -356,9 +370,10 @@ updateResourceAcls authorizer resPat f = do
return (newAcls, a)
>>= \case
-- FIXME: catch all exceptions?
Left (_ :: SomeException) -> do
Left (e :: SomeException) -> do
Log.warning $ "Failed to update ACLs for " <> Log.buildString' resPat <>
". Reading data and retrying update."
". Reading data and retrying update." <>
" error: " <> Log.buildString' e
threadDelay (50 * 1000) -- FIXME: retry interval
go oldAcls (retries + 1)
Right acls_ -> return acls_
Expand Down Expand Up @@ -391,6 +406,12 @@ updateCache AclCache{..} resPat@ResourcePattern{..} acls =
else Map.insert resPat acls aclCacheAcls
in AclCache newAcls cacheResAfterRemove

-- | Get the current number of ACLs. Return -1 if not implemented.
-- TODO: implement this
aclAclCount :: AuthorizableRequestContext
-> AclAuthorizer a
-> IO Int
aclAclCount _ _ = pure (-1)

------------------------------------------------------------
-- Helper functions
Expand Down Expand Up @@ -432,3 +453,21 @@ logAuditMessage AuthorizableRequestContext{..} AclAction{..} isAuthorized = do
False -> case aclActionLogIfDenied of
True -> Log.info . Log.buildString $ msg
False -> Log.trace . Log.buildString $ msg

----
aceToAclDescription :: AccessControlEntry -> K.AclDescription
aceToAclDescription (AccessControlEntry AccessControlEntryData{..}) =
K.AclDescription
{ principal = aceDataPrincipal
, host = aceDataHost
, operation = fromIntegral (fromEnum aceDataOperation)
, permissionType = fromIntegral (fromEnum aceDataPermissionType)
}

aclBindingsToDescribeAclsResource :: [AclBinding] -> K.DescribeAclsResource
aclBindingsToDescribeAclsResource xs =
K.DescribeAclsResource
{ resourceType = fromIntegral . fromEnum . resPatResourceType . aclBindingResourcePattern $ head xs -- FIXME: L.head
, resourceName = resPatResourceName . aclBindingResourcePattern $ head xs -- FIXME: L.head
, acls = K.KaArray (Just (V.fromList (aceToAclDescription . aclBindingACE <$> xs)))
}
19 changes: 18 additions & 1 deletion hstream-kafka/HStream/Kafka/Common/Authorizer/Class.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import HStream.Kafka.Common.Resource
import HStream.Kafka.Common.Security
import qualified Kafka.Protocol.Message as K

------------------------------------------------------------
-- Helper types
------------------------------------------------------------
data AclAction = AclAction
{ aclActionResPat :: ResourcePattern
, aclActionOp :: AclOperation
Expand Down Expand Up @@ -36,6 +39,9 @@ data AuthorizableRequestContext = AuthorizableRequestContext
-- , ...
}

------------------------------------------------------------
-- Abstract authorizer interface
------------------------------------------------------------
class Authorizer s where
-- | Create new ACL bindings.
createAcls :: AuthorizableRequestContext
Expand All @@ -58,10 +64,21 @@ class Authorizer s where
-- | Get the current number of ACLs. Return -1 if not implemented.
aclCount :: AuthorizableRequestContext
-> s
-> Int
-> IO Int

-- | Authorize the specified actions.
authorize :: AuthorizableRequestContext
-> s
-> [AclAction]
-> IO [AuthorizationResult]

------------------------------------------------------------
-- Existential wrapper for Authorizer
------------------------------------------------------------
data AuthorizerObject where
AuthorizerObject :: Authorizer s => s -> AuthorizerObject

withAuthorizerObject :: AuthorizerObject
-> (forall s. Authorizer s => s -> a)
-> a
withAuthorizerObject (AuthorizerObject x) f = f x
10 changes: 10 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ import qualified Kafka.Protocol.Service as K
-- For hstream
#cv_handler HadminCommand, 0, 0

-- ACL
#cv_handler DescribeAcls, 0, 0
#cv_handler CreateAcls, 0, 0
#cv_handler DeleteAcls, 0, 0

handlers :: ServerContext -> [K.ServiceHandler]
handlers sc =
[ #mk_handler ApiVersions, 0, 3
Expand Down Expand Up @@ -125,6 +130,11 @@ handlers sc =

-- For hstream
, #mk_handler HadminCommand, 0, 0

-- ACL
, #mk_handler DescribeAcls, 0, 0
, #mk_handler CreateAcls, 0, 0
, #mk_handler DeleteAcls, 0, 0
]

unAuthedHandlers :: ServerContext -> [K.ServiceHandler]
Expand Down
107 changes: 99 additions & 8 deletions hstream-kafka/HStream/Kafka/Server/Handler/Security.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,32 @@ module HStream.Kafka.Server.Handler.Security
( handleSaslHandshake
, handleSaslHandshakeAfterAuth
, handleSaslAuthenticate

, handleDescribeAcls
, handleCreateAcls
, handleDeleteAcls
) where

import qualified Data.Vector as V
import qualified Data.Vector as V

import HStream.Kafka.Server.Security.SASL (serverSupportedMechanismNames)
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
import HStream.Kafka.Server.Security.SASL (serverSupportedMechanismNames)
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

import Control.Monad
import Data.Function (on)
import qualified Data.List as L
import Data.Maybe
import qualified Data.Text as T
import HStream.Kafka.Common.Acl
import HStream.Kafka.Common.Authorizer
import HStream.Kafka.Common.Authorizer.Class
import HStream.Kafka.Common.Resource
import HStream.Kafka.Common.Security
-------------------------------------------------------------------------------

handleSaslHandshake :: ServerContext -> K.RequestContext -> K.SaslHandshakeRequest -> IO K.SaslHandshakeResponse
Expand Down Expand Up @@ -42,3 +56,80 @@ handleSaslAuthenticate _ _ _ = do
return $ K.SaslAuthenticateResponse K.ILLEGAL_SASL_STATE
(Just "SaslAuthenticate request received after successful authentication")
mempty

-------------------------------------------------------------------------------

toAuthorizableReqCtx :: K.RequestContext -> AuthorizableRequestContext
toAuthorizableReqCtx reqCtx =
AuthorizableRequestContext (T.pack reqCtx.clientHost)
(Principal "User" (fromMaybe "" (join reqCtx.clientId)))

handleDescribeAcls :: ServerContext
-> K.RequestContext
-> K.DescribeAclsRequest
-> IO K.DescribeAclsResponse
handleDescribeAcls ctx reqCtx req = do
let aclBindingFilter =
AclBindingFilter
(ResourcePatternFilter (toEnum . fromIntegral $ req.resourceTypeFilter)
(fromMaybe "" req.resourceNameFilter)
Pat_LITERAL)
(AccessControlEntryFilter
(AccessControlEntryData (fromMaybe "" req.principalFilter)
(fromMaybe "" req.hostFilter)
(toEnum . fromIntegral $ req.operation)
(toEnum . fromIntegral $ req.permissionType)))
let authCtx = toAuthorizableReqCtx reqCtx
aclBindings <- (withAuthorizerObject ctx.authorizer (getAcls authCtx)) aclBindingFilter
let xss = L.groupBy ((==) `on` aclBindingResourcePattern) aclBindings
let ress = K.KaArray (Just (V.fromList (aclBindingsToDescribeAclsResource <$> xss)))
return K.DescribeAclsResponse
{ throttleTimeMs = 0
, errorCode = K.NONE
, errorMessage = Just ""
, resources = ress
}


----
aclCreationToAclBinding :: K.AclCreation -> AclBinding
aclCreationToAclBinding x =
AclBinding (ResourcePattern (toEnum . fromIntegral $ x.resourceType)
x.resourceName
Pat_LITERAL)
(AccessControlEntry
(AccessControlEntryData x.principal
x.host
(toEnum . fromIntegral $ x.operation)
(toEnum . fromIntegral $ x.permissionType)))

handleCreateAcls :: ServerContext
-> K.RequestContext
-> K.CreateAclsRequest
-> IO K.CreateAclsResponse
handleCreateAcls ctx reqCtx req = do
let authCtx = toAuthorizableReqCtx reqCtx
let aclBindings = aclCreationToAclBinding <$> maybe [] V.toList (K.unKaArray req.creations)
(withAuthorizerObject ctx.authorizer (createAcls authCtx)) aclBindings

--
deleteAclsFilterToAclBindingFilter :: K.DeleteAclsFilter -> AclBindingFilter
deleteAclsFilterToAclBindingFilter x =
AclBindingFilter (ResourcePatternFilter (toEnum (fromIntegral x.resourceTypeFilter))
(fromMaybe "" x.resourceNameFilter)
Pat_LITERAL)
(AccessControlEntryFilter
(AccessControlEntryData (fromMaybe "" x.principalFilter)
(fromMaybe "" x.hostFilter)
(toEnum (fromIntegral x.operation))
(toEnum (fromIntegral x.permissionType))))

handleDeleteAcls :: ServerContext
-> K.RequestContext
-> K.DeleteAclsRequest
-> IO K.DeleteAclsResponse
handleDeleteAcls ctx reqCtx req = do
let authCtx = toAuthorizableReqCtx reqCtx
let filters = maybe [] (fmap deleteAclsFilterToAclBindingFilter . V.toList)
(K.unKaArray req.filters)
(withAuthorizerObject ctx.authorizer (deleteAcls authCtx)) filters
Loading

0 comments on commit 2876890

Please sign in to comment.