Skip to content

Commit

Permalink
Add pattern type support for kafka acls
Browse files Browse the repository at this point in the history
  • Loading branch information
ryarnyah authored and StephenSorriaux committed Sep 29, 2019
1 parent fcb2a0a commit 733cd46
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 20 deletions.
119 changes: 99 additions & 20 deletions library/kafka_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
CreateTopicsRequest_v0,
DeleteTopicsRequest_v0,
CreateAclsRequest_v0,
CreateAclsRequest_v1,
DeleteAclsRequest_v0,
DescribeAclsRequest_v0)
DescribeAclsRequest_v0,
DescribeAclsRequest_v1)
from kafka.protocol.types import (
Array, Boolean, Int8, Int16, Int32, Schema, String
)
Expand Down Expand Up @@ -100,6 +102,10 @@
- 'the operation the ACL controls.'
choices: [all, alter, alter_configs, cluster_actions, create, delete,
describe, describe_configs, idempotent_write, read, write]
acl_pattern_type:
description:
- 'the pattern type of the ACL. Need Kafka version >= 2.0.0'
choices: [any, match, literal, prefixed]
acl_permission:
description:
- 'should the ACL allow or deny the operation.'
Expand Down Expand Up @@ -693,6 +699,31 @@ def from_name(name):
raise ValueError("%r is not a valid ACLPermissionType" % name)


class ACLPatternType(IntEnum):
"""An enumerated type of pattern type for ACLs"""

ANY = 1,
MATCH = 2,
LITERAL = 3,
PREFIXED = 4

@staticmethod
def from_name(name):
if not isinstance(name, str):
raise ValueError("%r is not a valid ACLPatternType" % name)

if name.lower() == "any":
return ACLPatternType.ANY
elif name.lower() == "match":
return ACLPatternType.MATCH
elif name.lower() == "literal":
return ACLPatternType.LITERAL
elif name.lower() == "prefixed":
return ACLPatternType.PREFIXED
else:
raise ValueError("%r is not a valid ACLPatternType" % name)


class ACLResource(object):
"""A class for specifying config resources.
Arguments:
Expand All @@ -706,6 +737,7 @@ def __init__(
resource_type,
operation,
permission_type,
pattern_type=None,
name=None,
principal=None,
host=None,
Expand All @@ -722,15 +754,22 @@ def __init__(
raise IllegalArgumentError("permission_type must be of type "
"ACLPermissionType")
self.permission_type = permission_type
if pattern_type is not None and not isinstance(pattern_type,
ACLPatternType):
raise IllegalArgumentError("pattern_type must be of type "
"ACLPatternType")
self.pattern_type = pattern_type
self.name = name
self.principal = principal
self.host = host

def __repr__(self):
return "ACLResource(resource_type: %s, operation: %s, " \
"permission_type: %s, name: %s, principal: %s, host: %s)"\
"permission_type: %s, name: %s, principal: %s, host: %s, " \
"pattern_type: %s)" \
% (self.resource_type, self.operation,
self.permission_type, self.name, self.principal, self.host)
self.permission_type, self.name, self.principal, self.host,
self.pattern_type)


class KafkaManager:
Expand Down Expand Up @@ -845,6 +884,23 @@ def _convert_create_acls_resource_request_v0(acl_resource):
acl_resource.permission_type
)

@staticmethod
def _convert_create_acls_resource_request_v1(acl_resource):
if acl_resource.operation == ACLOperation.ANY:
raise IllegalArgumentError("operation must not be ANY")
if acl_resource.permission_type == ACLPermissionType.ANY:
raise IllegalArgumentError("permission_type must not be ANY")

return (
acl_resource.resource_type,
acl_resource.name,
acl_resource.pattern_type,
acl_resource.principal,
acl_resource.host,
acl_resource.operation,
acl_resource.permission_type
)

@staticmethod
def _convert_delete_acls_resource_request_v0(acl_resource):
return (
Expand All @@ -856,18 +912,29 @@ def _convert_delete_acls_resource_request_v0(acl_resource):
acl_resource.permission_type
)

def describe_acls(self, acl_resource):
def describe_acls(self, acl_resource, api_version):
"""Describe a set of ACLs
"""

request = DescribeAclsRequest_v0(
resource_type=acl_resource.resource_type,
resource_name=acl_resource.name,
principal=acl_resource.principal,
host=acl_resource.host,
operation=acl_resource.operation,
permission_type=acl_resource.permission_type
)
if api_version < parse_version('2.0.0'):
request = DescribeAclsRequest_v0(
resource_type=acl_resource.resource_type,
resource_name=acl_resource.name,
principal=acl_resource.principal,
host=acl_resource.host,
operation=acl_resource.operation,
permission_type=acl_resource.permission_type
)
else:
request = DescribeAclsRequest_v1(
resource_type=acl_resource.resource_type,
resource_name=acl_resource.name,
resource_pattern_type_filter=acl_resource.pattern_type,
principal=acl_resource.principal,
host=acl_resource.host,
operation=acl_resource.operation,
permission_type=acl_resource.permission_type
)

response = self.send_request_and_get_response(request)

Expand All @@ -883,14 +950,19 @@ def describe_acls(self, acl_resource):

return response.resources

def create_acls(self, acl_resources):
def create_acls(self, acl_resources, api_version):
"""Create a set of ACLs"""

request = CreateAclsRequest_v0(
creations=[self._convert_create_acls_resource_request_v0(
acl_resource) for acl_resource in acl_resources]
)

if api_version < parse_version('2.0.0'):
request = CreateAclsRequest_v0(
creations=[self._convert_create_acls_resource_request_v0(
acl_resource) for acl_resource in acl_resources]
)
else:
request = CreateAclsRequest_v1(
creations=[self._convert_create_acls_resource_request_v1(
acl_resource) for acl_resource in acl_resources]
)
response = self.send_request_and_get_response(request)

for error_code, error_message in response.creation_responses:
Expand Down Expand Up @@ -1366,6 +1438,9 @@ def main():
'describe', 'describe_configs',
'idempotent_write', 'read', 'write'],
required=False),
acl_pattern_type=dict(choice=['all', 'match', 'literal',
'prefixed'],
required=False, default='literal'),

acl_permission=dict(choices=['allow', 'deny'], default='allow'),

Expand Down Expand Up @@ -1499,6 +1574,7 @@ def main():
acl_principal = params['acl_principal']
acl_operation = params['acl_operation']
acl_permission = params['acl_permission']
acl_pattern_type = params['acl_pattern_type']
acl_host = params['acl_host']

api_version = tuple(
Expand Down Expand Up @@ -1648,20 +1724,23 @@ def main():
if not acl_operation:
module.fail_json(msg="acl_operation is required")

api_version = parse_version(manager.get_api_version())

acl_resource = ACLResource(
resource_type=ACLResourceType.from_name(acl_resource_type),
operation=ACLOperation.from_name(acl_operation),
permission_type=ACLPermissionType.from_name(acl_permission),
pattern_type=ACLPatternType.from_name(acl_pattern_type),
name=name,
principal=acl_principal,
host=acl_host)

acl_resource_found = manager.describe_acls(acl_resource)
acl_resource_found = manager.describe_acls(acl_resource, api_version)

if state == 'present':
if not acl_resource_found:
if not module.check_mode:
manager.create_acls([acl_resource])
manager.create_acls([acl_resource], api_version)
changed = True
msg += 'successfully created.'
elif state == 'absent':
Expand Down
2 changes: 2 additions & 0 deletions molecule/default/playbook.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
acl_principal: "User:common"
acl_operation: "write"
acl_permission: "allow"
acl_pattern_type: "literal"
state: "present"
bootstrap_servers: "{{ bootstrap_sasl }}"
security_protocol: SASL_PLAINTEXT
Expand Down Expand Up @@ -247,6 +248,7 @@
acl_principal: "User:common"
acl_operation: "write"
acl_permission: "allow"
acl_pattern_type: "literal"
state: "present"
bootstrap_servers: "{{ bootstrap_sasl }}"
security_protocol: SASL_PLAINTEXT
Expand Down

0 comments on commit 733cd46

Please sign in to comment.