diff --git a/examples/acl-creation-multiops/docker-compose.yml b/examples/acl-creation-multiops/docker-compose.yml new file mode 100644 index 00000000..f54340a5 --- /dev/null +++ b/examples/acl-creation-multiops/docker-compose.yml @@ -0,0 +1,27 @@ +--- +version: '2' +services: + zookeeper: + image: zookeeper:3.6 + command: "bin/zkServer.sh start-foreground" + network_mode: "host" + container_name: zookeeper + kafka: + image: wurstmeister/kafka:2.13-2.6.0 + command: "start-kafka.sh" + container_name: kafka + network_mode: "host" + environment: + KAFKA_DELETE_TOPIC_ENABLE: "true" + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_LISTENERS: PLAINTEXT://:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer + KAFKA_SUPER_USERS: User:admin + KAFKA_SASL_ENABLED_MECHANISMS: PLAIN + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" + KAFKA_OPTS: -Djava.security.auth.login.config=/opt/kafka/jaas/kafka_server_jaas.conf + volumes: + - ./kafka_server_jaas.conf:/opt/kafka/jaas/kafka_server_jaas.conf diff --git a/examples/acl-creation-multiops/kafka_server_jaas.conf b/examples/acl-creation-multiops/kafka_server_jaas.conf new file mode 100644 index 00000000..8b48239e --- /dev/null +++ b/examples/acl-creation-multiops/kafka_server_jaas.conf @@ -0,0 +1,7 @@ +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="admin" + password="admin-secret" + user_admin="admin-secret" + user_alice="alice-secret"; +}; \ No newline at end of file diff --git a/examples/acl-creation-multiops/playbook.yml b/examples/acl-creation-multiops/playbook.yml new file mode 100644 index 00000000..198eb727 --- /dev/null +++ b/examples/acl-creation-multiops/playbook.yml @@ -0,0 +1,98 @@ +--- +- name: Example | ACL creation + hosts: 127.0.0.1 + roles: + - name: kafka_lib + post_tasks: + - name: "Create a single ACL with multiple operations" + kafka_acl: + name: 'my-topic' + api_version: "2.6.0" + acl_resource_type: 'topic' + acl_principal: 'User:producer-client' + acl_operation: 'write' + acl_permission: 'allow' + acl_pattern_type: 'literal' + bootstrap_servers: "localhost:9092" + + - name: "Create a single ACL with multiple operations" + kafka_acl: + name: 'my-topic' + api_version: "2.6.0" + acl_resource_type: 'topic' + acl_principal: 'User:producer-client' + acl_operations: + - 'write' + - 'describe' + - 'create' + acl_permission: 'allow' + acl_pattern_type: 'literal' + bootstrap_servers: "localhost:9092" + + + - name: "Create multiple ACL with multiple operations" + kafka_acls: + + acls: + - name: 'my-topic' + acl_resource_type: 'topic' + acl_principal: 'User:consumer-client' + acl_operations: + - 'describe' + - 'read' + acl_permission: 'allow' + acl_pattern_type: 'literal' + + - name: 'my-consumer-group' + acl_resource_type: 'group' + acl_principal: 'User:consumer-client' + acl_operations: + - 'read' + acl_permission: 'allow' + acl_pattern_type: 'literal' + + api_version: "2.6.0" + bootstrap_servers: "localhost:9092" + + - name: "Get ACLs information" + kafka_info: + resource: "acl" + api_version: "2.6.0" + bootstrap_servers: "localhost:9092" + register: acls + + - name: "Display results" + debug: + var: acls + + + - name: "Delete multiple ACL with multiple operations" + kafka_acls: + + acls: + - name: 'my-topic' + acl_resource_type: 'topic' + acl_principal: 'User:producer-client' + acl_operations: + - 'write' + acl_permission: 'allow' + acl_pattern_type: 'literal' + state: 'absent' + + # Delete ALL operations + - name: 'my-topic' + acl_principal: 'User:consumer-client' + state: 'absent' + + bootstrap_servers: "localhost:9092" + + - name: "Get ACLs information" + kafka_info: + resource: "acl" + api_version: "2.6.0" + bootstrap_servers: "localhost:9092" + register: acls + + - name: "Display results" + debug: + var: acls diff --git a/library/kafka_acl.py b/library/kafka_acl.py index 465431d1..7bf98faa 100644 --- a/library/kafka_acl.py +++ b/library/kafka_acl.py @@ -15,7 +15,7 @@ from ansible.module_utils.kafka_lib_acl import process_module_acl from ansible.module_utils.kafka_lib_commons import ( - module_commons, module_acl_commons, + module_commons, module_acl_commons, module_acl_commons_validations, DOCUMENTATION_COMMON ) @@ -52,6 +52,7 @@ state: description: - 'state of the managed resource.' + - 'when state = present, one of acl_operation|acl_operations is required' default: present choices: [present, absent] acl_resource_type: @@ -68,6 +69,13 @@ acl_operation: description: - 'the operation the ACL controls.' + - 'mutually exclusive with acl_operation' + choices: [all, alter, alter_configs, cluster_action, create, delete, + describe, describe_configs, idempotent_write, read, write] + acl_operations: + description: + - 'a list of operations the ACL controls.' + - 'mutually exclusive with acl_operation' choices: [all, alter, alter_configs, cluster_action, create, delete, describe, describe_configs, idempotent_write, read, write] acl_pattern_type: @@ -120,20 +128,15 @@ def main(): """ Module usage """ - spec = dict( - # resource name - name=dict(type='str', required=True), - - state=dict(choices=['present', 'absent'], default='present'), - **module_commons ) spec.update(module_acl_commons) module = AnsibleModule( argument_spec=spec, - supports_check_mode=True + supports_check_mode=True, + **module_acl_commons_validations ) process_module_acl(module) diff --git a/library/kafka_acls.py b/library/kafka_acls.py index f81815f8..90325f66 100644 --- a/library/kafka_acls.py +++ b/library/kafka_acls.py @@ -15,7 +15,7 @@ from ansible.module_utils.kafka_lib_acl import process_module_acls from ansible.module_utils.kafka_lib_commons import ( - module_commons, module_acl_commons, + module_commons, module_acl_commons, module_acl_commons_validations, DOCUMENTATION_COMMON ) @@ -94,14 +94,11 @@ def main(): spec = dict( mark_others_as_absent=dict(type='bool', default=False), acls=dict( - type='list', - elements='dict', - required=True, - options=dict( - name=dict(type='str', required=True), - state=dict(choices=['present', 'absent'], default='present'), - **module_acl_commons - ) + type='list', + elements='dict', + required=True, + options=module_acl_commons, + **module_acl_commons_validations ), **module_commons ) diff --git a/module_utils/kafka_acl.py b/module_utils/kafka_acl.py index 9b73e7e5..b2e8dce6 100644 --- a/module_utils/kafka_acl.py +++ b/module_utils/kafka_acl.py @@ -23,6 +23,10 @@ class ACLOperation(IntEnum): ALTER_CONFIGS = 11, IDEMPOTENT_WRITE = 12 + def __eq__(self, other): + return int(self) == int(other) or \ + self is ACLOperation.ANY or other is ACLOperation.ANY + @staticmethod def from_name(name): if not isinstance(name, str): @@ -190,7 +194,7 @@ def __eq__(self, other): return NotImplemented return ( self.resource_type.value == other.resource_type.value and - self.operation.value == other.operation.value and + self.operation == other.operation and self.permission_type.value == other.permission_type.value and self.name == other.name and self.principal == other.principal and diff --git a/module_utils/kafka_lib_acl.py b/module_utils/kafka_lib_acl.py index d8a3ab59..b8214ff6 100644 --- a/module_utils/kafka_lib_acl.py +++ b/module_utils/kafka_lib_acl.py @@ -19,12 +19,41 @@ maybe_clean_zk_ssl_files ) +MATCH_ANY_RESOURCE = ACLResource( + resource_type=ACLResourceType.ANY, + operation=ACLOperation.ANY, + permission_type=ACLPermissionType.ANY, + pattern_type=ACLPatternType.ANY, + name=None, + principal=None, + host=None +) + + +def build_acl(acl, operation): + return ACLResource( + resource_type=ACLResourceType.from_name( + acl['acl_resource_type'] + ), + operation=operation, + permission_type=ACLPermissionType.from_name( + acl['acl_permission'] + ), + pattern_type=ACLPatternType.from_name( + acl['acl_pattern_type'] + ), + name=acl['name'], + principal=acl['acl_principal'], + host=acl['acl_host'] + ) + def process_module_acl(module): params = module.params.copy() params['acls'] = [{ 'name': params['name'], 'acl_operation': params['acl_operation'], + 'acl_operations': params['acl_operations'], 'acl_permission': params['acl_permission'], 'acl_pattern_type': params['acl_pattern_type'], 'acl_host': params['acl_host'], @@ -53,8 +82,12 @@ def process_module_acls(module, params=None): api_version = parse_version(manager.get_api_version()) for acl in acls: - if not acl['acl_operation']: - module.fail_json(msg="acl_operation is required") + + if 'acl_operation' in acl and acl['acl_operation'] is not None: + acl['acl_operations'] = [acl['acl_operation']] + + if acl['acl_operations'] is None: + acl['acl_operations'] = ['any'] if acl['acl_resource_type'].lower() == 'broker': module.deprecate( @@ -62,79 +95,44 @@ def process_module_acls(module, params=None): 'instead' ) - if len(acls) > 1: - acl_resource = ACLResource( - resource_type=ACLResourceType.ANY, - operation=ACLOperation.ANY, - permission_type=ACLPermissionType.ANY, - pattern_type=ACLPatternType.ANY, - name=None, - principal=None, - host=None - ) - else: - acl = acls[0] - - acl_name = acl['name'] - acl_resource_type = acl['acl_resource_type'] - acl_principal = acl['acl_principal'] - acl_operation = acl['acl_operation'] - acl_permission = acl['acl_permission'] - acl_pattern_type = acl['acl_pattern_type'] - acl_host = acl['acl_host'] - - acl_resource = ACLResource( - resource_type=ACLResourceType.from_name(acl_resource_type), - operation=ACLOperation.from_name(acl_operation), - permission_type=ACLPermissionType.from_name( - acl_permission - ), - pattern_type=ACLPatternType.from_name(acl_pattern_type), - name=acl_name, - principal=acl_principal, - host=acl_host - ) - acl_resource_found = manager.describe_acls( - acl_resource, api_version - ) + acls_marked_present = [ + build_acl(acl, ACLOperation.from_name(operation)) + for acl in acls for operation in acl['acl_operations'] + if acl['state'] == 'present' + ] - acls_marked_present = [ACLResource( - resource_type=ACLResourceType.from_name(acl['acl_resource_type']), - operation=ACLOperation.from_name(acl['acl_operation']), - permission_type=ACLPermissionType.from_name( - acl['acl_permission'] - ), - pattern_type=ACLPatternType.from_name(acl['acl_pattern_type']), - name=acl['name'], - principal=acl['acl_principal'], - host=acl['acl_host'] - ) for acl in acls if acl['state'] == 'present'] - acls_marked_absent = [ACLResource( - resource_type=ACLResourceType.from_name(acl['acl_resource_type']), - operation=ACLOperation.from_name(acl['acl_operation']), - permission_type=ACLPermissionType.from_name( - acl['acl_permission'] - ), - pattern_type=ACLPatternType.from_name(acl['acl_pattern_type']), - name=acl['name'], - principal=acl['acl_principal'], - host=acl['acl_host'] - ) for acl in acls if acl['state'] == 'absent'] + acls_marked_absent = [ + build_acl(acl, ACLOperation.from_name(operation)) + for acl in acls for operation in acl['acl_operations'] + if acl['state'] == 'absent' + ] # Check for duplicated acls - duplicated_acls = [acl for acl, count in collections.Counter( - acls_marked_absent + acls_marked_present - ).items() if count > 1] + duplicated_acls = [ + acl for acl, count in collections.Counter( + acls_marked_absent + acls_marked_present).items() if count > 1] + if len(duplicated_acls) > 0: module.fail_json( msg='Got duplicated acls in \'acls\': %s' % duplicated_acls ) return + acl_resource = build_acl(acls[0], ACLOperation.ANY) \ + if len(acls) == 1 else MATCH_ANY_RESOURCE + + acl_resource_found = manager.describe_acls( + acl_resource, api_version + ) + acls_to_add = [acl for acl in acls_marked_present if acl not in acl_resource_found] - acls_to_delete = [acl for acl in acls_marked_absent - if acl in acl_resource_found] + + # loop over acl_resource_found instead of acls_marked_absent + # this allow to delete correct and specific operations + # in case of ANY matching + acls_to_delete = [acl for acl in acl_resource_found + if acl in acls_marked_absent] # Cleanup others acls if mark_others_as_absent: diff --git a/module_utils/kafka_lib_commons.py b/module_utils/kafka_lib_commons.py index 69916c32..fdc3b113 100644 --- a/module_utils/kafka_lib_commons.py +++ b/module_utils/kafka_lib_commons.py @@ -87,25 +87,71 @@ options=dict(required=False, type='dict', default={}), ) +acl_operations_choices = [ + 'all', + 'alter', + 'alter_configs', + 'cluster_action', + 'create', + 'delete', + 'describe', + 'describe_configs', + 'idempotent_write', + 'read', + 'write' +] + +module_acl_commons_validations = dict( + mutually_exclusive=[ + ('acl_operation', 'acl_operations') + ], + required_if=[ + ('state', 'present', ('acl_operation', 'acl_operations'), True) + ] +) + module_acl_commons = dict( - acl_resource_type=dict(choices=['topic', 'broker', 'cluster', - 'delegation_token', 'group', - 'transactional_id'], - default='topic'), + + name=dict(type='str', required=True), + state=dict( + choices=['present', 'absent'], + default='present' + ), + acl_resource_type=dict( + choices=[ + 'topic', + 'broker', + 'cluster', + 'delegation_token', + 'group', + 'transactional_id' + ], + default='topic' + ), acl_principal=dict(type='str', required=False), + acl_operations=dict( + type='list', + elements='str', + choices=acl_operations_choices + ), + acl_operation=dict( + choices=acl_operations_choices, + required=False + ), - acl_operation=dict(choices=['all', 'alter', 'alter_configs', - 'cluster_action', 'create', 'delete', - 'describe', 'describe_configs', - 'idempotent_write', 'read', 'write'], - required=False), - acl_pattern_type=dict(choice=['any', 'match', 'literal', - 'prefixed'], - required=False, default='literal'), + acl_pattern_type=dict( + choice=[ + 'any', + 'match', + 'literal', + 'prefixed' + ], + required=False, + default='literal' + ), acl_permission=dict(choices=['allow', 'deny'], default='allow'), - acl_host=dict(type='str', required=False, default="*"), ) diff --git a/module_utils/kafka_manager.py b/module_utils/kafka_manager.py index 0f91f26a..8ae136d9 100644 --- a/module_utils/kafka_manager.py +++ b/module_utils/kafka_manager.py @@ -245,7 +245,7 @@ def _ensure_partitions_error_code_0(response): @staticmethod def _convert_create_acls_resource_request_v0(acl_resource): - if acl_resource.operation == ACLOperation.ANY: + if acl_resource.operation is ACLOperation.ANY: raise IllegalArgumentError("operation must not be ANY") if acl_resource.permission_type == ACLPermissionType.ANY: raise IllegalArgumentError("permission_type must not be ANY") @@ -261,7 +261,7 @@ def _convert_create_acls_resource_request_v0(acl_resource): @staticmethod def _convert_create_acls_resource_request_v1(acl_resource): - if acl_resource.operation == ACLOperation.ANY: + if acl_resource.operation is ACLOperation.ANY: raise IllegalArgumentError("operation must not be ANY") if acl_resource.permission_type == ACLPermissionType.ANY: raise IllegalArgumentError("permission_type must not be ANY") diff --git a/molecule/default/tests/ansible_utils.py b/molecule/default/tests/ansible_utils.py index 7bdbd0be..7005c279 100644 --- a/molecule/default/tests/ansible_utils.py +++ b/molecule/default/tests/ansible_utils.py @@ -54,6 +54,18 @@ def get_consumer_group(): 'acl_pattern_type': 'literal' } +acl_multi_ops_configuration = { + 'acl_resource_type': 'topic', + 'state': 'absent', + 'acl_principal': 'User:common', + 'acl_operations': [ + 'write', + 'describe' + ], + 'acl_permission': 'allow', + 'acl_pattern_type': 'literal' +} + cg_defaut_configuration = {} quotas_default_configuration = {} diff --git a/molecule/default/tests/test_acl_default.py b/molecule/default/tests/test_acl_default.py index f97494ab..156c4e67 100644 --- a/molecule/default/tests/test_acl_default.py +++ b/molecule/default/tests/test_acl_default.py @@ -1,13 +1,15 @@ """ Main tests for library """ - import os import time +import pytest + import testinfra.utils.ansible_runner from tests.ansible_utils import ( acl_defaut_configuration, + acl_multi_ops_configuration, sasl_default_configuration, ensure_kafka_acl, get_acl_name, check_configured_acl, ensure_idempotency, @@ -27,12 +29,23 @@ kafka_hosts[host] = host.ansible.get_variables() -def test_acl_create(host): +acl_configurations_testdata = [ + pytest.param( + acl_defaut_configuration, id="default_acl" + ), + pytest.param( + acl_multi_ops_configuration, id="multi_ops_acl" + ), +] + + +@pytest.mark.parametrize("acl_configuration", acl_configurations_testdata) +def test_acl_create(host, acl_configuration): """ Check if can create acls """ # Given - test_acl_configuration = acl_defaut_configuration.copy() + test_acl_configuration = acl_configuration.copy() test_acl_configuration.update({ 'name': get_acl_name(), 'state': 'absent' @@ -60,12 +73,13 @@ def test_acl_create(host): check_configured_acl(host, test_acl_configuration, kfk_addr) -def test_acl_delete(host): +@pytest.mark.parametrize("acl_configuration", acl_configurations_testdata) +def test_acl_delete(host, acl_configuration): """ Check if can delete acls """ # Given - test_acl_configuration = acl_defaut_configuration.copy() + test_acl_configuration = acl_configuration.copy() test_acl_configuration.update({ 'name': get_acl_name(), 'state': 'present' @@ -93,12 +107,13 @@ def test_acl_delete(host): check_configured_acl(host, test_acl_configuration, kfk_addr) -def test_check_mode(host): +@pytest.mark.parametrize("acl_configuration", acl_configurations_testdata) +def test_check_mode(host, acl_configuration): """ Check if can check mode do nothing """ # Given - test_acl_configuration = acl_defaut_configuration.copy() + test_acl_configuration = acl_configuration.copy() test_acl_configuration.update({ 'name': get_acl_name(), 'state': 'present' @@ -141,18 +156,20 @@ def test_check_mode(host): check_configured_acl(host, test_acl_configuration, kfk_sasl_addr) -def test_acls_create(host): +@pytest.mark.parametrize("acl_configuration", acl_configurations_testdata) +def test_acls_create(host, acl_configuration): """ Check if can create acls """ # Given def get_acl_config(): - acl_configuration = acl_defaut_configuration.copy() - acl_configuration.update({ + copy = acl_configuration.copy() + copy.update({ 'name': get_acl_name(), 'state': 'absent' }) - return acl_configuration + return copy + test_acl_configuration = { 'acls': [ get_acl_config(), @@ -183,7 +200,8 @@ def get_acl_config(): check_configured_acl(host, acl, kfk_addr) -def test_duplicated_acls(host): +@pytest.mark.parametrize("acl_configuration", acl_configurations_testdata) +def test_duplicated_acls(host, acl_configuration): """ Check if can create acls """ @@ -191,12 +209,13 @@ def test_duplicated_acls(host): duplicated_acl_name = get_acl_name() def get_acl_config(): - acl_configuration = acl_defaut_configuration.copy() - acl_configuration.update({ + copy = acl_configuration.copy() + copy.update({ 'name': duplicated_acl_name, 'state': 'present' }) - return acl_configuration + return copy + test_acl_configuration = { 'acls': [ get_acl_config(), diff --git a/tests/module_utils/test_kafka_acls.py b/tests/module_utils/test_kafka_acls.py new file mode 100644 index 00000000..c96c98aa --- /dev/null +++ b/tests/module_utils/test_kafka_acls.py @@ -0,0 +1,26 @@ + +from module_utils.kafka_acl import ACLResourceType as rs +from module_utils.kafka_acl import ACLOperation as op +from module_utils.kafka_acl import ( + ACLResource, ACLPermissionType, ACLPatternType +) + + +def test_acl_operations_equality(): + assert op.ANY == op.WRITE + assert op.WRITE == op.WRITE + assert op.READ != op.WRITE + + +def test_acl_resource_equality(): + commons = { + 'pattern_type': ACLPatternType.LITERAL, + 'name': 'my-topic', + 'principal': 'User:alice', + 'host': '*' + } + r1 = ACLResource(rs.TOPIC, op.WRITE, ACLPermissionType.ALLOW, **commons) + r2 = ACLResource(rs.TOPIC, op.ANY, ACLPermissionType.ALLOW, **commons) + + assert r1 == r2 + assert r1 in [r2]