Skip to content

Commit

Permalink
kafka: basic ACL (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed Jan 16, 2024
1 parent fe95d39 commit 44a9b18
Show file tree
Hide file tree
Showing 15 changed files with 1,088 additions and 0 deletions.
1 change: 1 addition & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ constraints:
, entropy == 0.4.1.7 || > 0.4.1.9
-- https://github.com/nikita-volkov/deferred-folds/issues/6
, deferred-folds == 0.9.18.3 || > 0.9.18.4
, base64 <1
94 changes: 94 additions & 0 deletions hstream-kafka/HStream/Kafka/Common/Acl.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
{-# LANGUAGE RecordWildCards #-}

module HStream.Kafka.Common.Acl where

import Data.Text (Text)
import qualified Data.Text as T

import HStream.Kafka.Common.Resource

-- [0..14]
data AclOperation
= AclOp_UNKNOWN
| AclOp_ANY
| AclOp_ALL
| AclOp_READ
| AclOp_WRITE
| AclOp_CREATE
| AclOp_DELETE
| AclOp_ALTER
| AclOp_DESCRIBE
| AclOp_CLUSTER_ACTION
| AclOp_DESCRIBE_CONFIGS
| AclOp_ALTER_CONFIGS
| AclOp_IDEMPOTENT_WRITE
| AclOp_CREATE_TOKENS
| AclOp_DESCRIBE_TOKENS
deriving (Eq, Enum, Show)
-- FIXME: Show

-- [0..3]
data AclPermissionType
= AclPerm_UNKNOWN
| AclPerm_ANY -- used in filter
| AclPerm_DENY
| AclPerm_ALLOW
deriving (Eq, Enum, Show)
-- FIXME: Show

-- | Data of an access control entry (ACE), which is a 4-tuple of principal,
-- host, operation and permission type.
-- Used in both 'AccessControlEntry' and 'AccessControlEntryFilter',
-- with slightly different field requirements.
data AccessControlEntryData = AccessControlEntryData
{ aceDataPrincipal :: Text
, aceDataHost :: Text
, aceDataOperation :: AclOperation
, aceDataPermissionType :: AclPermissionType
}
instance Show AccessControlEntryData where
show AccessControlEntryData{..} =
"(principal=" <> s_principal <>
", host=" <> s_host <>
", operation=" <> show aceDataOperation <>
", permissionType=" <> show aceDataPermissionType <> ")"
where s_principal = if T.null aceDataPrincipal then "<any>" else T.unpack aceDataPrincipal
s_host = if T.null aceDataHost then "<any>" else T.unpack aceDataHost

-- | An access control entry (ACE).
-- Requirements: principal and host can not be null.
-- operation can not be 'AclOp_ANY'.
-- permission type can not be 'AclPerm_ANY'.
newtype AccessControlEntry = AccessControlEntry
{ aceData :: AccessControlEntryData
}
instance Show AccessControlEntry where
show AccessControlEntry{..} = show aceData

-- | A filter which matches access control entry(ACE)s.
-- Requirements: principal and host can both be null.
newtype AccessControlEntryFilter = AccessControlEntryFilter
{ aceFilterData :: AccessControlEntryData
}
instance Show AccessControlEntryFilter where
show AccessControlEntryFilter{..} = show aceFilterData

-- | A binding between a resource pattern and an access control entry (ACE).
data AclBinding = AclBinding
{ aclBindingResourcePattern :: ResourcePattern
, aclBindingACE :: AccessControlEntry
}
instance Show AclBinding where
show AclBinding{..} =
"(pattern=" <> show aclBindingResourcePattern <>
", entry=" <> show aclBindingACE <> ")"

-- | A filter which can match 'AclBinding's.
data AclBindingFilter = AclBindingFilter
{ aclBindingFilterResourcePatternFilter :: ResourcePatternFilter
, aclBindingFilterACEFilter :: AccessControlEntryFilter
}
instance Show AclBindingFilter where
show AclBindingFilter{..} =
"(patternFilter=" <> show aclBindingFilterResourcePatternFilter <>
", entryFilter=" <> show aclBindingFilterACEFilter <> ")"
98 changes: 98 additions & 0 deletions hstream-kafka/HStream/Kafka/Common/Resource.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
{-# LANGUAGE RecordWildCards #-}

module HStream.Kafka.Common.Resource where

import Data.Text (Text)
import qualified Data.Text as T

-- | A type of resource that can be applied to by an ACL, which is an 'Int8'
-- start from 0.
-- See org.apache.kafka.common.resource.ResourceType.
data ResourceType
= Res_UNKNOWN
| Res_ANY
| Res_TOPIC
| Res_GROUP
| Res_CLUSTER
| Res_TRANSACTIONAL_ID
| Res_DELEGATION_TOKEN
| Res_USER
deriving (Eq, Enum)
instance Show ResourceType where
show Res_UNKNOWN = "UNKNOWN"
show Res_ANY = "ANY"
show Res_TOPIC = "TOPIC"
show Res_GROUP = "GROUP"
show Res_CLUSTER = "CLUSTER"
show Res_TRANSACTIONAL_ID = "TRANSACTIONAL_ID"
show Res_DELEGATION_TOKEN = "DELEGATION_TOKEN"
show Res_USER = "USER"

-- | A cluster resource which is a 2-tuple (type, name).
-- See org.apache.kafka.common.resource.Resource.
data Resource = Resource
{ resResourceType :: ResourceType
, resResourceName :: Text
}
instance Show Resource where
show Resource{..} =
"(resourceType=" <> show resResourceType <>
", name=" <> s_name <> ")"
where s_name = if T.null resResourceName then "<any>" else T.unpack resResourceName

-- [0..4]
-- | A resource pattern type, which is an 'Int8' start from 0.
-- WARNING: Be sure to understand the meaning of 'Pat_MATCH'.
-- A '(TYPE, "name", MATCH)' filter matches the following patterns:
-- 1. All '(TYPE, "name", LITERAL)'
-- 2. All '(TYPE, "*", LITERAL)'
-- 3. All '(TYPE, "name", PREFIXED)'
-- See org.apache.kafka.common.resource.PatternType.
data PatternType
= Pat_UNKNOWN
| Pat_ANY
| Pat_MATCH
| Pat_LITERAL
| Pat_PREFIXED
deriving (Eq, Enum)
instance Show PatternType where
show Pat_UNKNOWN = "UNKNOWN"
show Pat_ANY = "ANY"
show Pat_MATCH = "MATCH"
show Pat_LITERAL = "LITERAL"
show Pat_PREFIXED = "PREFIXED"

-- | A pattern used by ACLs to match resources.
-- See org.apache.kafka.common.resource.ResourcePattern.
data ResourcePattern = ResourcePattern
{ resPatResourceType :: ResourceType -- | Can not be 'Res_ANY'
, resPatResourceName :: Text -- | Can not be null but can be 'WILDCARD' -- FIXME: which?
, resPatPatternType :: PatternType -- | Can not be 'Pat_ANY' or 'Pat_MATCH'
}
instance Show ResourcePattern where
show ResourcePattern{..} =
"ResourcePattern(resourceType=" <> show resPatResourceType <>
", name=" <> T.unpack resPatResourceName <>
", patternType=" <> show resPatPatternType <> ")"

-- | A filter that can match 'ResourcePattern'.
-- See org.apache.kafka.common.resource.ResourcePatternFilter.
data ResourcePatternFilter = ResourcePatternFilter
{ resPatFilterResourceType :: ResourceType
-- | The resource type to match. If 'Res_ANY', ignore the resource type.
-- Otherwise, only match patterns with the same resource type.
, resPatFilterResourceName :: Text
-- | The resource name to match. If null, ignore the resource name.
-- If 'WILDCARD', only match wildcard patterns. -- FIXME: which WILDCARD?
, resPatFilterPatternType :: PatternType
-- | The resource pattern type to match.
-- If 'Pat_ANY', match ignore the pattern type.
-- If 'Pat_MATCH', see 'Pat_MATCH'.
-- Otherwise, match patterns with the same pattern type.
}
instance Show ResourcePatternFilter where
show ResourcePatternFilter{..} =
"ResourcePattern(resourceType=" <> show resPatFilterResourceType <>
", name=" <> s_name <>
", patternType=" <> show resPatFilterPatternType <> ")"
where s_name = if T.null resPatFilterResourceName then "<any>" else T.unpack resPatFilterResourceName
39 changes: 39 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Security/Authorizer.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE RebindableSyntax #-}

module HStream.Kafka.Server.Security.Authorizer where

import Data.Text (Text)
import qualified Data.Text as T
import Data.Kind (Type)

import qualified Kafka.Protocol.Error as K

data AclAction = AclAction
{ aclActionResPat :: ResourcePattern
, aclActionOp :: AclOperation
--, aclActionLogIfAllowed :: Bool
--, more...
}

data AuthorizationResult
= Authz_ALLOWED
| Authz_DENIED
deriving (Eq, Enum, Show)

class Authorizer s :: Type where
-- | Create new ACL bindings.
createAcls :: s -> [AclBinding] -> IO [K.ErrorCode]

-- | Remove matched ACL bindings.
deleteAcls :: s -> [AclBindingFilter] -> IO [K.ErrorCode]

-- | Get matched ACL bindings
getAcls :: s -> AclBindingFilter -> IO [AclBinding]

-- | Get the current number of ACLs. Return -1 if not implemented.
aclCount :: s -> Int

-- | Authorize the specified actions.
authorize :: s -> [AclAction] -> IO [AuthorizationResult]
22 changes: 22 additions & 0 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ library
exposed-modules:
HStream.Kafka.Client.Api
HStream.Kafka.Client.Cli
HStream.Kafka.Common.Resource
HStream.Kafka.Common.Acl
HStream.Kafka.Common.KafkaException
HStream.Kafka.Common.Metrics
HStream.Kafka.Common.OffsetManager
Expand Down Expand Up @@ -236,3 +238,23 @@ library
RecordWildCards
UnliftedFFITypes
UnliftedNewtypes

test-suite kafka-common-test
import: shared-properties
type: exitcode-stdio-1.0
main-is: Spec.hs
hs-source-dirs: tests/common
build-depends:
, base >=4.11 && <5
, bytestring
, tasty
, tasty-hunit
, tasty-quickcheck
, hstream-kafka
default-extensions:
LambdaCase
OverloadedStrings
RecordWildCards
default-language: GHC2021
build-tool-depends: hspec-discover:hspec-discover >=2 && <3
ghc-options: -threaded -rtsopts -with-rtsopts=-N
45 changes: 45 additions & 0 deletions hstream-kafka/message/CreateAclsRequest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 30,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "CreateAclsRequest",
// Version 1 adds resource pattern type.
// Version 2 enables flexible versions.
// Version 3 adds user resource type.
"validVersions": "0-3",
"flexibleVersions": "2+",
"fields": [
{ "name": "Creations", "type": "[]AclCreation", "versions": "0+",
"about": "The ACLs that we want to create.", "fields": [
{ "name": "ResourceType", "type": "int8", "versions": "0+",
"about": "The type of the resource." },
{ "name": "ResourceName", "type": "string", "versions": "0+",
"about": "The resource name for the ACL." },
{ "name": "ResourcePatternType", "type": "int8", "versions": "1+", "default": "3",
"about": "The pattern type for the ACL." },
{ "name": "Principal", "type": "string", "versions": "0+",
"about": "The principal for the ACL." },
{ "name": "Host", "type": "string", "versions": "0+",
"about": "The host for the ACL." },
{ "name": "Operation", "type": "int8", "versions": "0+",
"about": "The operation type for the ACL (read, write, etc.)." },
{ "name": "PermissionType", "type": "int8", "versions": "0+",
"about": "The permission type for the ACL (allow, deny, etc.)." }
]}
]
}
36 changes: 36 additions & 0 deletions hstream-kafka/message/CreateAclsResponse.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 30,
"type": "response",
"name": "CreateAclsResponse",
// Starting in version 1, on quota violation, brokers send out responses before throttling.
// Version 2 enables flexible versions.
// Version 3 adds user resource type.
"validVersions": "0-3",
"flexibleVersions": "2+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Results", "type": "[]AclCreationResult", "versions": "0+",
"about": "The results for each ACL creation.", "fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The result error, or zero if there was no error." },
{ "name": "ErrorMessage", "type": "string", "nullableVersions": "0+", "versions": "0+",
"about": "The result message, or null if there was no error." }
]}
]
}
Loading

0 comments on commit 44a9b18

Please sign in to comment.