Skip to content

Commit

Permalink
Add feature to delete topic partiton for a given consumer group.
Browse files Browse the repository at this point in the history
  • Loading branch information
Yassine MILHI committed Sep 13, 2021
1 parent bac9c5e commit 19ed914
Show file tree
Hide file tree
Showing 8 changed files with 399 additions and 4 deletions.
129 changes: 129 additions & 0 deletions library/kafka_consumer_group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Ansible module for consumer group
"""
from __future__ import absolute_import, division, print_function

__metaclass__ = type

from pkg_resources import parse_version

from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.pycompat24 import get_exception
from kafka.errors import KafkaError

from ansible.module_utils.kafka_lib_commons import (
module_commons, DOCUMENTATION_COMMON, get_manager_from_params,
maybe_clean_kafka_ssl_files
)

ANSIBLE_METADATA = {'metadata_version': '1.0'}

DOCUMENTATION = '''
---
module: kafka_consumer_group
short_description: Interact with kafka consumer groups
description:
- Interact with kafka consumer groups.
- Not compatible with Kafka version < 2.4.0.
author:
- Yassine MILHI
options:
topics:
descritption:
- 'consumed topics partitions on which action will be performed'
required: True
consumer_group:
description:
- 'one consumer group name.'
required: True
action:
description:
- 'action to apply (alter / delete).'
required: True
''' + DOCUMENTATION_COMMON

EXAMPLES = '''
- name: Delete offset for consumer group
kafka_consumer_group:
consumer_group: "{{ consumer_group | default('lolo_consumer')}}"
topics:
- name: lolo # mandatory
partitions: [0, 1, 2] # Optional
action: delete
bootstrap_servers: "{{ ansible_ssh_host }}:9094"
api_version: "{{ kafka_api_version }}"
sasl_mechanism: "PLAIN"
security_protocol: "SASL_SSL"
sasl_plain_username: "admin"
sasl_plain_password: "{{ kafka_admin_password }}"
ssl_check_hostname: False
ssl_cafile: "{{ kafka_cacert | default('/etc/ssl/certs/cacert.crt') }}"
'''


def main():
module = AnsibleModule(
argument_spec=dict(
consumer_group=dict(type='str', required=True),
topics=dict(
type='list',
elements='dict',
required=True,
options=dict(
name=dict(type='str', required=True),
partitions=dict(type='list', elements='int',
required=False)
)),
action=dict(type='str', required=True,
required_one_of=[('delete')]),

**module_commons
),
supports_check_mode=True
)

params = module.params
consumer_group = params['consumer_group']
topics = params['topics']
action = params['action']

try:
manager = get_manager_from_params(params)
api_version = parse_version(manager.get_api_version())
if(api_version < parse_version('2.4.0')):
module.fail_json(
msg='Delete offset API provided on kafka 2.4.0 (KIP-496)'
+ ' current version %s' % str(manager.get_api_version())
)
if(action == 'delete'):
changed = manager.delete_group_offset(consumer_group, topics,
module.check_mode)
except KafkaError:
e = get_exception()
module.fail_json(
msg='Error while deleting kafka consumer group offset: %s' % e
)
except Exception:
e = get_exception()
module.fail_json(
msg='Seomthing went wrong: %s ' % e
)
finally:
if manager:
manager.close()
maybe_clean_kafka_ssl_files(params)

if(changed):
msg = 'topics and partitions (%s) successfully deleted ' \
'for consumer group (%s)' % (topics, consumer_group)
else:
msg = 'nothing to do for consumer group %s and topics ' \
'partitions (%s)' % (consumer_group, topics)

module.exit_json(changed=changed, msg=msg)


if __name__ == '__main__':
main()
64 changes: 63 additions & 1 deletion module_utils/kafka_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import itertools
import json

from pkg_resources import parse_version
import time

Expand All @@ -8,7 +9,8 @@
ListPartitionReassignmentsRequest_v0,
DescribeConfigsRequest_v1,
DescribeClientQuotasRequest_v0,
AlterClientQuotasRequest_v0
AlterClientQuotasRequest_v0,
OffsetDeleteRequest_v0
)
from kafka.client_async import KafkaClient
from kazoo.client import KazooClient
Expand All @@ -27,9 +29,15 @@
CreatePartitionsRequest_v0,
AlterConfigsRequest_v0
)

from kafka.protocol.offset import (
OffsetRequest_v1
)

from kafka.protocol.commit import (
OffsetFetchRequest_v2, GroupCoordinatorRequest_v0
)

from kafka.protocol.group import MemberAssignment, ProtocolMetadata
import kafka.errors
from kafka.errors import IllegalArgumentError
Expand Down Expand Up @@ -183,6 +191,60 @@ def delete_topics(self, topics, timeout=None):
)
)

def get_coordinator_for_consumer_group(self, consumer_group=None):

# Identify which broker is coordinating this consumer group
response = self.send_request_and_get_response(
GroupCoordinatorRequest_v0(consumer_group)
)
if response.error_code != self.SUCCESS_CODE:
raise KafkaManagerError(
'Error getting coordinator for consumer group %s.' % (
consumer_group)
)
return response.coordinator_id

def delete_group_offset(self, group_id, topics, check_mode):

consumer_coordinator = self.get_coordinator_for_consumer_group(
group_id)

consumed_offsets = self.send_request_and_get_response(
OffsetFetchRequest_v2(group_id, None), consumer_coordinator)

consumed_topics = [e[0] for e in consumed_offsets.topics] # topic name

changed = False

topics_partitions = []
for topic in topics:
topic_name = topic['name']
partitons = topic.get('partitions')
if(partitons is None or not partitons):
partitons = list(self.get_partitions_for_topic(topic_name)
.keys())
topics_partitions.append((topic_name, partitons))
if(topic_name in consumed_topics):
changed = True

if changed:
request = OffsetDeleteRequest_v0(
group_id=group_id, topics=topics_partitions)

if not check_mode:
response = self.send_request_and_get_response(
request, consumer_coordinator)
if response.error_code != self.SUCCESS_CODE:
raise KafkaManagerError(
'Error while deleting consumer group %s.' % (
group_id)
)
return changed

@staticmethod
def _ensure_partitions_error_code_0(response):
raise KafkaManagerError(response.topics)

@staticmethod
def _convert_create_acls_resource_request_v0(acl_resource):
if acl_resource.operation == ACLOperation.ANY:
Expand Down
30 changes: 30 additions & 0 deletions module_utils/kafka_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,36 @@ class DescribeClientQuotasResponse_v0(Response):
)


class OffsetDeleteResponse_v0(Response):
API_KEY = 47
API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('throttle_time_ms', Int32),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16)))))
)


class OffsetDeleteRequest_v0(Request):
API_KEY = 47
API_VERSION = 0
RESPONSE_TYPE = OffsetDeleteResponse_v0
SCHEMA = Schema(
('group_id', String('utf-8')),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(Int32))
))
)


API_KEYS[47] = 'OffsetDelete'


class DescribeClientQuotasRequest_v0(Request):
API_KEY = 48
API_VERSION = 0
Expand Down
1 change: 1 addition & 0 deletions molecule/default/molecule.yml
Original file line number Diff line number Diff line change
Expand Up @@ -323,5 +323,6 @@ verifier:
n: "auto"
group: "${PYTEST_SPLIT_GROUP:-1}"
splits: "${PYTEST_SPLIT_SPLITS:-1}"
timeout: "${PYTEST_TIMEOUT_S:-600}"
lint:
name: flake8
67 changes: 64 additions & 3 deletions molecule/default/tests/ansible_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def get_consumer_group():
'acl_pattern_type': 'literal'
}

cg_defaut_configuration = {}

quotas_default_configuration = {}

sasl_default_configuration = {
Expand Down Expand Up @@ -83,6 +85,8 @@ def get_consumer_group():

host_protocol_version = {}

CONSUMER_MAX_RETRY = 1000

ansible_kafka_supported_versions = \
(molecule_configuration['provisioner']
['inventory']
Expand Down Expand Up @@ -384,6 +388,36 @@ def call_kafka_quotas(
return results


def call_kafka_consumer_group(
host,
args=None,
check=False,
minimal_api_version="0.0.0"
):
results = []
if args is None:
args = {}
if 'sasl_plain_username' in args:
envs = env_sasl
else:
envs = env_no_sasl
for env in envs:
protocol_version = env['protocol_version']
if (parse_version(minimal_api_version) >
parse_version(protocol_version)):
continue

module_args = {
'api_version': protocol_version,
'bootstrap_servers': env['kfk_addr'],
}
module_args.update(args)
module_args = "{{ %s }}" % json.dumps(module_args)
results.append(host.ansible('kafka_consumer_group',
module_args, check=check))
return results


def call_kafka_acl(
host,
args=None,
Expand Down Expand Up @@ -498,6 +532,12 @@ def ensure_kafka_topic_with_zk(host, topic_defaut_configuration,
return call_kafka_topic_with_zk(host, call_config, check)


def ensure_kafka_consumer_group(host, test_consumer_group_configuration,
check=False, minimal_api_version="0.0.0"):
return call_kafka_consumer_group(host, test_consumer_group_configuration,
check, minimal_api_version)


def ensure_kafka_acl(host, test_acl_configuration, check=False):
return call_kafka_acl(host, test_acl_configuration, check)

Expand All @@ -506,6 +546,16 @@ def ensure_kafka_acls(host, test_acl_configuration, check=False):
return call_kafka_acls(host, test_acl_configuration, check)


def check_unconsumed_topic(consumer_group, unconsumed_topic, kafka_servers):
kafka_client = KafkaManager(
bootstrap_servers=kafka_servers,
api_version=(2, 4, 0)
)
kafka_client.get_consumed_topic_for_consumer_group(
consumer_group)
# assert unconsumed_topic not in consumed_topics


def check_configured_topic(host, topic_configuration,
topic_name, kafka_servers, deleted_options=None):
"""
Expand Down Expand Up @@ -730,8 +780,15 @@ def check_configured_quotas_zookeeper(host, quotas_configuration, zk_server):
zk.close()


def produce_and_consume_topic(topic_name, total_msg, consumer_group):
def produce_and_consume_topic(topic_name, total_msg, consumer_group,
close_consumer=False,
minimal_api_version="0.0.0"):
for env in env_no_sasl:
protocol_version = env['protocol_version']
if (parse_version(minimal_api_version) >
parse_version(protocol_version)):
continue

server = env['kfk_addr']

producer = KafkaProducer(
Expand All @@ -753,14 +810,18 @@ def produce_and_consume_topic(topic_name, total_msg, consumer_group):
heartbeat_interval_ms=30000 # keep the group alive
)

retry = 0
# ensure we consume only 1 msg
msg = None
while not msg:
while not msg and retry < CONSUMER_MAX_RETRY:
msg = consumer.poll(timeout_ms=100, max_records=1)
retry += 1

# will commit offset to 1
consumer.commit()
# voluntary dont close the client to keep the consumer group alive
# Consumer group is kept alive if specified (close_consumer=False)
if close_consumer:
consumer.close()


def ensure_idempotency(func, *args, **kwargs):
Expand Down
Loading

0 comments on commit 19ed914

Please sign in to comment.