Skip to content

Commit

Permalink
kafka: minor fixes on ACL
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed Feb 27, 2024
1 parent 83d33a9 commit 8ce240c
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 48 deletions.
19 changes: 9 additions & 10 deletions hstream-kafka/HStream/Kafka/Common/Acl.hs
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ instance Enum AclPermissionType where
-- Used in both 'AccessControlEntry' and 'AccessControlEntryFilter',
-- with slightly different field requirements.
data AccessControlEntryData = AccessControlEntryData
{ aceDataPrincipal :: Text
, aceDataHost :: Text
, aceDataOperation :: AclOperation
, aceDataPermissionType :: AclPermissionType
{ aceDataPrincipal :: !Text
, aceDataHost :: !Text
, aceDataOperation :: !AclOperation
, aceDataPermissionType :: !AclPermissionType
} deriving (Eq, Ord)
instance Show AccessControlEntryData where
show AccessControlEntryData{..} =
Expand Down Expand Up @@ -190,8 +190,8 @@ instance Matchable AccessControlEntry AccessControlEntryFilter where

-- | A binding between a resource pattern and an access control entry (ACE).
data AclBinding = AclBinding
{ aclBindingResourcePattern :: ResourcePattern
, aclBindingACE :: AccessControlEntry
{ aclBindingResourcePattern :: !ResourcePattern
, aclBindingACE :: !AccessControlEntry
} deriving (Eq, Ord)
instance Show AclBinding where
show AclBinding{..} =
Expand All @@ -200,8 +200,8 @@ instance Show AclBinding where

-- | A filter which can match 'AclBinding's.
data AclBindingFilter = AclBindingFilter
{ aclBindingFilterResourcePatternFilter :: ResourcePatternFilter
, aclBindingFilterACEFilter :: AccessControlEntryFilter
{ aclBindingFilterResourcePatternFilter :: !ResourcePatternFilter
, aclBindingFilterACEFilter :: !AccessControlEntryFilter
}
instance Show AclBindingFilter where
show AclBindingFilter{..} =
Expand All @@ -225,11 +225,10 @@ instance Matchable AclBinding AclBindingFilter where
}
anyFilter = AclBindingFilter anyFilter anyFilter

-- TODO: validate
-- 1. No UNKNOWN contained
-- 2. resource pattern name is not empty and does not contain '/'
validateAclBinding :: AclBinding -> Either String ()
validateAclBinding AclBinding{..}
| T.null (resPatResourceName aclBindingResourcePattern) = Left "Resource name is empty"
| T.any (== '/') (resPatResourceName aclBindingResourcePattern) = Left "Resource name contains '/'"
| otherwise = Right () -- FIXME
| otherwise = Right ()
18 changes: 9 additions & 9 deletions hstream-kafka/HStream/Kafka/Common/AclEntry.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import HStream.Kafka.Common.Resource
import HStream.Kafka.Common.Security

data AclEntry = AclEntry
{ aclEntryPrincipal :: Principal
, aclEntryHost :: Text
, aclEntryOperation :: AclOperation
, aclEntryPermissionType :: AclPermissionType
{ aclEntryPrincipal :: !Principal
, aclEntryHost :: !Text
, aclEntryOperation :: !AclOperation
, aclEntryPermissionType :: !AclPermissionType
} deriving (Eq, Ord)
instance Show AclEntry where
show AclEntry{..} =
Expand Down Expand Up @@ -63,8 +63,8 @@ defaultVersion :: Version
defaultVersion = 1

data AclResourceNode = AclResourceNode
{ aclResNodeVersion :: Version
, aclResNodeAcls :: Acls
{ aclResNodeVersion :: !Version
, aclResNodeAcls :: !Acls
} deriving (Eq, Show)
instance Aeson.ToJSON AclResourceNode where
toJSON AclResourceNode{..} =
Expand All @@ -78,7 +78,7 @@ instance Aeson.FromJSON AclResourceNode where
parseJSON o = fail $ "Invalid AclResourceNode: " <> show o

data AclCache = AclCache
{ aclCacheAcls :: Map.Map ResourcePattern Acls
, aclCacheResources :: Map.Map (AccessControlEntry,ResourceType,PatternType)
(Set.Set Text)
{ aclCacheAcls :: !(Map.Map ResourcePattern Acls)
, aclCacheResources :: !(Map.Map (AccessControlEntry,ResourceType,PatternType)
(Set.Set Text))
}
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Common/AclStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ loadAllAcls a aclsConsumer = do
aclNodes <- Meta.getAllMeta @AclResourceNode a
forM_ (Map.toList aclNodes) $ \(key, node) -> do
case resourcePatternFromMetadataKey key of
Nothing -> error $ "Invalid key of resource pattern: " <> T.unpack key
Nothing -> error $ "Invalid key of resource pattern: " <> T.unpack key -- FIXME: error
Just resPat -> do
aclsConsumer resPat (aclResNodeAcls node)
20 changes: 11 additions & 9 deletions hstream-kafka/HStream/Kafka/Common/Authorizer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ import qualified Kafka.Protocol.Message as K
-- The lock ensures that only one thread can update the cache and the store
-- at the same time.
data AclAuthorizer a = AclAuthorizer
{ authorizerCache :: IORef AclCache
, authorizerMetaStore :: a
, authorizerLock :: MVar ()
{ authorizerCache :: !(IORef AclCache)
, authorizerMetaStore :: !a
, authorizerLock :: !(MVar ())
}

newAclAuthorizer :: IO a -> IO (AclAuthorizer a)
Expand Down Expand Up @@ -251,12 +251,11 @@ aclCreateAcls _ authorizer bindings = withMVar (authorizerLock authorizer) $ \_
validateEachBinding (i, b@AclBinding{..}) = do
case supportExtenedAcl of
False -> do
-- FIXME: ERROR CODE
let result = K.AclCreationResult K.NONE (Just "Adding ACLs on prefixed resource patterns requires version xxx or higher")
-- FIXME: error message
let result = K.AclCreationResult K.UNSUPPORTED_VERSION (Just "Adding ACLs on prefixed resource patterns requires version xxx or higher")
return $ Left (i, result)
True -> case validateAclBinding b of
-- FIXME: ERROR CODE
Left s -> return $ Left (i, K.AclCreationResult K.NONE (Just . T.pack $ s))
Left s -> return $ Left (i, K.AclCreationResult K.INVALID_REQUEST (Just . T.pack $ s))
Right _ -> return $ Right (aclBindingResourcePattern, (i, b))

addAclsForEachRes :: (ResourcePattern, [(Int, AclBinding)]) -> IO [(Int, K.AclCreationResult)]
Expand All @@ -268,7 +267,7 @@ aclCreateAcls _ authorizer bindings = withMVar (authorizerLock authorizer) $ \_
in (newAcls, results)
case results_e of
-- FIXME: ERROR CODE
Left (e :: SomeException) -> return $ L.map (\(i,_) -> (i, K.AclCreationResult K.NONE (Just $ "Failed to update ACLs" <> (T.pack (show e))))) bs
Left (e :: SomeException) -> return $ L.map (\(i,_) -> (i, K.AclCreationResult K.UNKNOWN_SERVER_ERROR (Just $ "Failed to update ACLs" <> (T.pack (show e))))) bs
Right x -> return x

-- | Delete ACls for the given filters.
Expand Down Expand Up @@ -406,7 +405,7 @@ updateResourceAcls authorizer resPat f = do
Log.warning $ "Failed to update ACLs for " <> Log.buildString' resPat <>
". Reading data and retrying update." <>
" error: " <> Log.buildString' e
threadDelay (50 * 1000) -- FIXME: retry interval
threadDelay (500 * 1000) -- FIXME: retry interval
go oldAcls (retries + 1)
Right acls_ -> return acls_
| otherwise = do
Expand Down Expand Up @@ -453,12 +452,15 @@ groupByValue m = Map.foldrWithKey' f Map.empty m
where
f k v acc = Map.insertWith (++) v [k] acc

-- FIXME: configuable
supportExtenedAcl :: Bool
supportExtenedAcl = True

-- FIXME: configuable
superUsers :: Set.Set Principal
superUsers = Set.empty

-- FIXME: configuable
allowIfNoAclFound :: Bool
allowIfNoAclFound = False

Expand Down
14 changes: 6 additions & 8 deletions hstream-kafka/HStream/Kafka/Common/Authorizer/Class.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
{-# LANGUAGE FunctionalDependencies #-}

module HStream.Kafka.Common.Authorizer.Class where

import Data.Text (Text)
Expand All @@ -13,10 +11,10 @@ import qualified Kafka.Protocol.Message as K
-- Helper types
------------------------------------------------------------
data AclAction = AclAction
{ aclActionResPat :: ResourcePattern
, aclActionOp :: AclOperation
, aclActionLogIfAllowed :: Bool
, aclActionLogIfDenied :: Bool
{ aclActionResPat :: !ResourcePattern
, aclActionOp :: !AclOperation
, aclActionLogIfAllowed :: !Bool
, aclActionLogIfDenied :: !Bool
-- , more...
}
instance Show AclAction where
Expand All @@ -34,8 +32,8 @@ data AuthorizationResult

-- TODO
data AuthorizableRequestContext = AuthorizableRequestContext
{ authReqCtxHost :: Text
, authReqCtxPrincipal :: Principal
{ authReqCtxHost :: !Text
, authReqCtxPrincipal :: !Principal
-- , ...
}

Expand Down
6 changes: 3 additions & 3 deletions hstream-kafka/HStream/Kafka/Common/Resource.hs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ isPatternTypeSpecific _ = False
-- See org.apache.kafka.common.resource.ResourcePattern.
data ResourcePattern = ResourcePattern
{ resPatResourceType :: ResourceType -- | Can not be 'Res_ANY'
, resPatResourceName :: Text -- | Can not be null but can be 'WILDCARD' -- FIXME: which?
, resPatResourceName :: Text -- | Can not be null but can be 'WILDCARD'
, resPatPatternType :: PatternType -- | Can not be 'Pat_ANY' or 'Pat_MATCH'
} deriving (Eq)
instance Show ResourcePattern where
Expand All @@ -147,7 +147,7 @@ instance Show ResourcePattern where
", name=" <> T.unpack resPatResourceName <>
", patternType=" <> show resPatPatternType <> ")"

-- FIXME: design a proper serialization format for 'ResourcePattern'.
-- FIXME then: design a proper serialization format for 'ResourcePattern'.
-- WARNING: the resource name may contain '_'!
-- | Convert a 'ResourcePattern' to a metadata key typed 'Text'.
-- A wildcard resource name "*" will be converted to "AnyResource".
Expand Down Expand Up @@ -192,7 +192,7 @@ data ResourcePatternFilter = ResourcePatternFilter
-- Otherwise, only match patterns with the same resource type.
, resPatFilterResourceName :: Text
-- | The resource name to match. If null, ignore the resource name.
-- If 'WILDCARD', only match wildcard patterns. -- FIXME: which WILDCARD?
-- If 'WILDCARD', only match wildcard patterns.
, resPatFilterPatternType :: PatternType
-- | The resource pattern type to match.
-- If 'Pat_ANY', match ignore the pattern type.
Expand Down
12 changes: 4 additions & 8 deletions hstream-kafka/tests/HStream/Kafka/Common/AuthorizerSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import HStream.Kafka.Common.Authorizer.Class
import HStream.Kafka.Common.Resource hiding (match)
import HStream.Kafka.Common.Security
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K

spec :: Spec
Expand Down Expand Up @@ -72,8 +73,7 @@ specWithAuthorizer storeType =
it "create acl with empty resource name should fail" $ \a -> do
K.CreateAclsResponse{..} <-
createAcls baseReqCtx a [allowReadAcl `on` ("" `typed` Res_GROUP `match` Pat_LITERAL)]
-- FIXME: error code
results `shouldBe` K.KaArray (Just . V.singleton $ K.AclCreationResult 0 (Just "Resource name is empty"))
results `shouldBe` K.KaArray (Just . V.singleton $ K.AclCreationResult K.INVALID_REQUEST (Just "Resource name is empty"))
it "deny rule should override allow rule" $ \a -> do
let host = "192.168.2.1"
principal = Principal "User" baseUsername
Expand All @@ -92,9 +92,5 @@ specWithAuthorizer storeType =
void $ createAcls reqCtx a [allowAll `on` resource]
authorize reqCtx a [AclOp_READ `on` resource] `shouldReturn` [Authz_ALLOWED]
it "delete all acls" $ \a -> do
let anyResPatFilter = ResourcePatternFilter Res_ANY wildcardResourceName Pat_LITERAL
anyAceFilter = AccessControlEntryFilter $
AccessControlEntryData "" ""AclOp_ANY AclPerm_ANY
let anyAclBindingFilter = AclBindingFilter anyResPatFilter anyAceFilter
void $ deleteAcls baseReqCtx a [anyAclBindingFilter]
getAcls baseReqCtx a anyAclBindingFilter `shouldReturn` []
void $ deleteAcls baseReqCtx a [anyFilter]
getAcls baseReqCtx a anyFilter `shouldReturn` []

0 comments on commit 8ce240c

Please sign in to comment.