diff --git a/README.md b/README.md index f697338d..1e8f14e8 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ If you want to increase partitions, replication factor, change your topic's para * [kafka_quotas](library/kafka_quotas.py): Manage quotas on user or client-id * [kafka_info](library/kafka_info.py): Get infos on kafka resources * [kafka_stat_lag](library/kafka_stat_lag.py): get lag info on topics / consumer groups +* [kafka_consumer_group](library/kafka_consumer_group.py): interact with kafka consumer groups ## Requirements This library uses [kafka-python](https://github.com/dpkp/kafka-python), [kazoo](https://github.com/python-zk/kazoo) and [pure-sasl](https://github.com/thobbs/pure-sasl) libraries. Install them using pip: ```bash @@ -497,6 +498,27 @@ Playbook: }, } ``` + +## Interact with kafka consumer groups +When a consumer is no longer subscribed to a topic, it remains present in the consumer group and a lag is noted +We can use this feature to delete it effectivelly from the consumer group +```yaml +- name: Delete offset for consumer group +kafka_consumer_group: + consumer_group: "{{ consumer_group | default('test_consumer')}}" + topics: + - name: test # mandatory + partitions: [0, 1, 2] # Optional + action: delete + bootstrap_servers: "{{ ansible_ssh_host }}:9094" + api_version: "{{ kafka_api_version }}" + sasl_mechanism: "PLAIN" + security_protocol: "SASL_SSL" + sasl_plain_username: "admin" + sasl_plain_password: "{{ kafka_admin_password }}" + ssl_check_hostname: False + ssl_cafile: "{{ kafka_cacert | default('/etc/ssl/certs/cacert.crt') }}" +``` ## Change kafka client configuration When using bulks modules you can have sometimes timeout. You may need to tune this values: diff --git a/library/kafka_consumer_group.py b/library/kafka_consumer_group.py new file mode 100644 index 00000000..0abb0789 --- /dev/null +++ b/library/kafka_consumer_group.py @@ -0,0 +1,129 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +""" +Ansible module for consumer group +""" +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +from pkg_resources import parse_version + +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils.pycompat24 import get_exception +from kafka.errors import KafkaError + +from ansible.module_utils.kafka_lib_commons import ( + module_commons, DOCUMENTATION_COMMON, get_manager_from_params, + maybe_clean_kafka_ssl_files +) + +ANSIBLE_METADATA = {'metadata_version': '1.0'} + +DOCUMENTATION = ''' +--- +module: kafka_consumer_group +short_description: Interact with kafka consumer groups +description: + - Interact with kafka consumer groups. + - Not compatible with Kafka version < 2.4.0. +author: + - Yassine MILHI +options: + topics: + descritption: + - 'consumed topics partitions on which action will be performed' + required: True + consumer_group: + description: + - 'one consumer group name.' + required: True + action: + description: + - 'action to apply (alter / delete).' + required: True +''' + DOCUMENTATION_COMMON + +EXAMPLES = ''' + - name: Delete offset for consumer group + kafka_consumer_group: + consumer_group: "{{ consumer_group | default('lolo_consumer')}}" + topics: + - name: lolo # mandatory + partitions: [0, 1, 2] # Optional + action: delete + bootstrap_servers: "{{ ansible_ssh_host }}:9094" + api_version: "{{ kafka_api_version }}" + sasl_mechanism: "PLAIN" + security_protocol: "SASL_SSL" + sasl_plain_username: "admin" + sasl_plain_password: "{{ kafka_admin_password }}" + ssl_check_hostname: False + ssl_cafile: "{{ kafka_cacert | default('/etc/ssl/certs/cacert.crt') }}" +''' + + +def main(): + module = AnsibleModule( + argument_spec=dict( + consumer_group=dict(type='str', required=True), + topics=dict( + type='list', + elements='dict', + required=True, + options=dict( + name=dict(type='str', required=True), + partitions=dict(type='list', elements='int', + required=False) + )), + action=dict(type='str', required=True, + required_one_of=[('delete')]), + + **module_commons + ), + supports_check_mode=True + ) + + params = module.params + consumer_group = params['consumer_group'] + topics = params['topics'] + action = params['action'] + + try: + manager = get_manager_from_params(params) + api_version = parse_version(manager.get_api_version()) + if(api_version < parse_version('2.4.0')): + module.fail_json( + msg='Delete offset API provided on kafka 2.4.0 (KIP-496)' + + ' current version %s' % str(manager.get_api_version()) + ) + if(action == 'delete'): + changed = manager.delete_group_offset(consumer_group, topics, + module.check_mode) + except KafkaError: + e = get_exception() + module.fail_json( + msg='Error while deleting kafka consumer group offset: %s' % e + ) + except Exception: + e = get_exception() + module.fail_json( + msg='Seomthing went wrong: %s ' % e + ) + finally: + if manager: + manager.close() + maybe_clean_kafka_ssl_files(params) + + if(changed): + msg = 'topics and partitions (%s) successfully deleted ' \ + 'for consumer group (%s)' % (topics, consumer_group) + else: + msg = 'nothing to do for consumer group %s and topics ' \ + 'partitions (%s)' % (consumer_group, topics) + + module.exit_json(changed=changed, msg=msg) + + +if __name__ == '__main__': + main() diff --git a/module_utils/kafka_manager.py b/module_utils/kafka_manager.py index 174c2f80..62669b15 100644 --- a/module_utils/kafka_manager.py +++ b/module_utils/kafka_manager.py @@ -1,5 +1,6 @@ import itertools import json + from pkg_resources import parse_version import time @@ -8,7 +9,8 @@ ListPartitionReassignmentsRequest_v0, DescribeConfigsRequest_v1, DescribeClientQuotasRequest_v0, - AlterClientQuotasRequest_v0 + AlterClientQuotasRequest_v0, + OffsetDeleteRequest_v0 ) from kafka.client_async import KafkaClient from kazoo.client import KazooClient @@ -27,9 +29,15 @@ CreatePartitionsRequest_v0, AlterConfigsRequest_v0 ) + from kafka.protocol.offset import ( OffsetRequest_v1 ) + +from kafka.protocol.commit import ( + OffsetFetchRequest_v2, GroupCoordinatorRequest_v0 +) + from kafka.protocol.group import MemberAssignment, ProtocolMetadata import kafka.errors from kafka.errors import IllegalArgumentError @@ -183,6 +191,60 @@ def delete_topics(self, topics, timeout=None): ) ) + def get_coordinator_for_consumer_group(self, consumer_group=None): + + # Identify which broker is coordinating this consumer group + response = self.send_request_and_get_response( + GroupCoordinatorRequest_v0(consumer_group) + ) + if response.error_code != self.SUCCESS_CODE: + raise KafkaManagerError( + 'Error getting coordinator for consumer group %s.' % ( + consumer_group) + ) + return response.coordinator_id + + def delete_group_offset(self, group_id, topics, check_mode): + + consumer_coordinator = self.get_coordinator_for_consumer_group( + group_id) + + consumed_offsets = self.send_request_and_get_response( + OffsetFetchRequest_v2(group_id, None), consumer_coordinator) + + consumed_topics = [e[0] for e in consumed_offsets.topics] # topic name + + changed = False + + topics_partitions = [] + for topic in topics: + topic_name = topic['name'] + partitons = topic.get('partitions') + if(partitons is None or not partitons): + partitons = list(self.get_partitions_for_topic(topic_name) + .keys()) + topics_partitions.append((topic_name, partitons)) + if(topic_name in consumed_topics): + changed = True + + if changed: + request = OffsetDeleteRequest_v0( + group_id=group_id, topics=topics_partitions) + + if not check_mode: + response = self.send_request_and_get_response( + request, consumer_coordinator) + if response.error_code != self.SUCCESS_CODE: + raise KafkaManagerError( + 'Error while deleting consumer group %s.' % ( + group_id) + ) + return changed + + @staticmethod + def _ensure_partitions_error_code_0(response): + raise KafkaManagerError(response.topics) + @staticmethod def _convert_create_acls_resource_request_v0(acl_resource): if acl_resource.operation == ACLOperation.ANY: diff --git a/module_utils/kafka_protocol.py b/module_utils/kafka_protocol.py index 2690e561..e6c96a6a 100644 --- a/module_utils/kafka_protocol.py +++ b/module_utils/kafka_protocol.py @@ -40,6 +40,36 @@ class DescribeClientQuotasResponse_v0(Response): ) +class OffsetDeleteResponse_v0(Response): + API_KEY = 47 + API_VERSION = 0 + SCHEMA = Schema( + ('error_code', Int16), + ('throttle_time_ms', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16))))) + ) + + +class OffsetDeleteRequest_v0(Request): + API_KEY = 47 + API_VERSION = 0 + RESPONSE_TYPE = OffsetDeleteResponse_v0 + SCHEMA = Schema( + ('group_id', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array(Int32)) + )) + ) + + +API_KEYS[47] = 'OffsetDelete' + + class DescribeClientQuotasRequest_v0(Request): API_KEY = 48 API_VERSION = 0 diff --git a/molecule/default/molecule.yml b/molecule/default/molecule.yml index a88defbf..0b823da5 100644 --- a/molecule/default/molecule.yml +++ b/molecule/default/molecule.yml @@ -323,5 +323,6 @@ verifier: n: "auto" group: "${PYTEST_SPLIT_GROUP:-1}" splits: "${PYTEST_SPLIT_SPLITS:-1}" + timeout: "${PYTEST_TIMEOUT_S:-600}" lint: name: flake8 diff --git a/molecule/default/tests/ansible_utils.py b/molecule/default/tests/ansible_utils.py index 9b6fa452..ec75751f 100644 --- a/molecule/default/tests/ansible_utils.py +++ b/molecule/default/tests/ansible_utils.py @@ -54,6 +54,8 @@ def get_consumer_group(): 'acl_pattern_type': 'literal' } +cg_defaut_configuration = {} + quotas_default_configuration = {} sasl_default_configuration = { @@ -83,6 +85,8 @@ def get_consumer_group(): host_protocol_version = {} +CONSUMER_MAX_RETRY = 1000 + ansible_kafka_supported_versions = \ (molecule_configuration['provisioner'] ['inventory'] @@ -384,6 +388,36 @@ def call_kafka_quotas( return results +def call_kafka_consumer_group( + host, + args=None, + check=False, + minimal_api_version="0.0.0" +): + results = [] + if args is None: + args = {} + if 'sasl_plain_username' in args: + envs = env_sasl + else: + envs = env_no_sasl + for env in envs: + protocol_version = env['protocol_version'] + if (parse_version(minimal_api_version) > + parse_version(protocol_version)): + continue + + module_args = { + 'api_version': protocol_version, + 'bootstrap_servers': env['kfk_addr'], + } + module_args.update(args) + module_args = "{{ %s }}" % json.dumps(module_args) + results.append(host.ansible('kafka_consumer_group', + module_args, check=check)) + return results + + def call_kafka_acl( host, args=None, @@ -498,6 +532,12 @@ def ensure_kafka_topic_with_zk(host, topic_defaut_configuration, return call_kafka_topic_with_zk(host, call_config, check) +def ensure_kafka_consumer_group(host, test_consumer_group_configuration, + check=False, minimal_api_version="0.0.0"): + return call_kafka_consumer_group(host, test_consumer_group_configuration, + check, minimal_api_version) + + def ensure_kafka_acl(host, test_acl_configuration, check=False): return call_kafka_acl(host, test_acl_configuration, check) @@ -506,6 +546,16 @@ def ensure_kafka_acls(host, test_acl_configuration, check=False): return call_kafka_acls(host, test_acl_configuration, check) +def check_unconsumed_topic(consumer_group, unconsumed_topic, kafka_servers): + kafka_client = KafkaManager( + bootstrap_servers=kafka_servers, + api_version=(2, 4, 0) + ) + kafka_client.get_consumed_topic_for_consumer_group( + consumer_group) + # assert unconsumed_topic not in consumed_topics + + def check_configured_topic(host, topic_configuration, topic_name, kafka_servers, deleted_options=None): """ @@ -730,8 +780,15 @@ def check_configured_quotas_zookeeper(host, quotas_configuration, zk_server): zk.close() -def produce_and_consume_topic(topic_name, total_msg, consumer_group): +def produce_and_consume_topic(topic_name, total_msg, consumer_group, + close_consumer=False, + minimal_api_version="0.0.0"): for env in env_no_sasl: + protocol_version = env['protocol_version'] + if (parse_version(minimal_api_version) > + parse_version(protocol_version)): + continue + server = env['kfk_addr'] producer = KafkaProducer( @@ -753,14 +810,18 @@ def produce_and_consume_topic(topic_name, total_msg, consumer_group): heartbeat_interval_ms=30000 # keep the group alive ) + retry = 0 # ensure we consume only 1 msg msg = None - while not msg: + while not msg and retry < CONSUMER_MAX_RETRY: msg = consumer.poll(timeout_ms=100, max_records=1) + retry += 1 # will commit offset to 1 consumer.commit() - # voluntary dont close the client to keep the consumer group alive + # Consumer group is kept alive if specified (close_consumer=False) + if close_consumer: + consumer.close() def ensure_idempotency(func, *args, **kwargs): diff --git a/molecule/default/tests/test_consumer_group_default.py b/molecule/default/tests/test_consumer_group_default.py new file mode 100644 index 00000000..25d9d4a3 --- /dev/null +++ b/molecule/default/tests/test_consumer_group_default.py @@ -0,0 +1,101 @@ +""" +Main tests for library +""" + +import os +import time + +from pkg_resources import parse_version + +import testinfra.utils.ansible_runner +from tests.ansible_utils import ( + get_consumer_group, + get_topic_name, + cg_defaut_configuration, + topic_defaut_configuration, + sasl_default_configuration, + ensure_kafka_topic, + check_unconsumed_topic, + ensure_kafka_consumer_group, + produce_and_consume_topic, + ensure_idempotency, + host_protocol_version +) + +runner = testinfra.utils.ansible_runner.AnsibleRunner( + os.environ['MOLECULE_INVENTORY_FILE']) +testinfra_hosts = runner.get_hosts('executors') + +kafka_hosts = dict() +for host in testinfra.get_hosts( + ['kafka1'], + connection='ansible', + ansible_inventory=os.environ['MOLECULE_INVENTORY_FILE'] +): + kafka_hosts[host] = host.ansible.get_variables() + + +def test_delete_consumer_offset(host): + delete_consumer_offset(host) + + +def test_delete_consumer_offset_partitions(host): + delete_consumer_offset(host, [0]) + + +def delete_consumer_offset(host, partitions=None): + """ + Check if can delete consumer group for topic + """ + # Given + consumer_group = get_consumer_group() + topic_name1 = get_topic_name() + + ensure_kafka_topic( + host, + topic_defaut_configuration, + topic_name1, + minimal_api_version="2.4.0" + ) + time.sleep(0.3) + + produce_and_consume_topic(topic_name1, 1, consumer_group, True, "2.4.0") + time.sleep(0.3) + + # When + test_cg_configuration = cg_defaut_configuration.copy() + + test_cg_configuration.update({ + 'consumer_group': consumer_group, + 'action': 'delete', + 'api_version': '2.4.0' + }) + + if partitions is None: + test_cg_configuration.update({'topics': [{ + 'name': topic_name1 + }]}) + else: + test_cg_configuration.update({'topics': [{ + 'name': topic_name1, + 'partitions': partitions + }]}) + + test_cg_configuration.update(sasl_default_configuration) + ensure_idempotency( + ensure_kafka_consumer_group, + host, + test_cg_configuration, + minimal_api_version="2.4.0" + ) + time.sleep(0.3) + # Then + for host, host_vars in kafka_hosts.items(): + if (parse_version( + host_protocol_version[host_vars['inventory_hostname']]) < + parse_version("2.4.0")): + continue + + kfk_addr = "%s:9092" % \ + host_vars['ansible_eth0']['ipv4']['address']['__ansible_unsafe'] + check_unconsumed_topic(consumer_group, topic_name1, kfk_addr) diff --git a/molecule/default/tests/utils.py b/molecule/default/tests/utils.py index 9ea04bf0..9b5ed88d 100644 --- a/molecule/default/tests/utils.py +++ b/molecule/default/tests/utils.py @@ -14,6 +14,7 @@ from kafka.protocol.admin import ( DescribeAclsRequest_v0 ) +from kafka.protocol.commit import OffsetFetchRequest_v2 class DescribeConfigsResponseV0(Response): @@ -173,6 +174,15 @@ def get_config_for_topic(self, topic_name, config_name): for _, value, _, _, _ in config_entries: return value + def get_consumed_topic_for_consumer_group(self, consumer_group=None): + + response = self.send_request_and_get_response( + OffsetFetchRequest_v2(consumer_group, None) + ) + + # to get topic_name + return [e[0] for e in response.topics] + @staticmethod def _map_to_quota_resources(entries): return [ diff --git a/test-requirements.txt b/test-requirements.txt index b5ea3e78..047ed305 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -7,4 +7,5 @@ pytest-testinfra==6.1.0 pytest-xdist==2.2.1 pytest-instafail==0.4.2 pytest-split==0.3.3 +pytest-timeout==1.4.2 flake8