Skip to content

Commit

Permalink
Merge pull request #106 from ymilhi/feat/delete-consumer-group-topic-…
Browse files Browse the repository at this point in the history
…partitions

Add feature to delete topic partiton for a given consumer group
  • Loading branch information
ryarnyah authored Sep 15, 2021
2 parents bac9c5e + c255c1b commit 291ba00
Show file tree
Hide file tree
Showing 9 changed files with 421 additions and 4 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ If you want to increase partitions, replication factor, change your topic's para
* [kafka_quotas](library/kafka_quotas.py): Manage quotas on user or client-id
* [kafka_info](library/kafka_info.py): Get infos on kafka resources
* [kafka_stat_lag](library/kafka_stat_lag.py): get lag info on topics / consumer groups
* [kafka_consumer_group](library/kafka_consumer_group.py): interact with kafka consumer groups
## Requirements
This library uses [kafka-python](https://github.com/dpkp/kafka-python), [kazoo](https://github.com/python-zk/kazoo) and [pure-sasl](https://github.com/thobbs/pure-sasl) libraries. Install them using pip:
```bash
Expand Down Expand Up @@ -497,6 +498,27 @@ Playbook:
},
}
```

## Interact with kafka consumer groups
When a consumer is no longer subscribed to a topic, it remains present in the consumer group and a lag is noted
We can use this feature to delete it effectivelly from the consumer group
```yaml
- name: Delete offset for consumer group
kafka_consumer_group:
consumer_group: "{{ consumer_group | default('test_consumer')}}"
topics:
- name: test # mandatory
partitions: [0, 1, 2] # Optional
action: delete
bootstrap_servers: "{{ ansible_ssh_host }}:9094"
api_version: "{{ kafka_api_version }}"
sasl_mechanism: "PLAIN"
security_protocol: "SASL_SSL"
sasl_plain_username: "admin"
sasl_plain_password: "{{ kafka_admin_password }}"
ssl_check_hostname: False
ssl_cafile: "{{ kafka_cacert | default('/etc/ssl/certs/cacert.crt') }}"
```
## Change kafka client configuration
When using bulks modules you can have sometimes timeout.
You may need to tune this values:
Expand Down
129 changes: 129 additions & 0 deletions library/kafka_consumer_group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Ansible module for consumer group
"""
from __future__ import absolute_import, division, print_function

__metaclass__ = type

from pkg_resources import parse_version

from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.pycompat24 import get_exception
from kafka.errors import KafkaError

from ansible.module_utils.kafka_lib_commons import (
module_commons, DOCUMENTATION_COMMON, get_manager_from_params,
maybe_clean_kafka_ssl_files
)

ANSIBLE_METADATA = {'metadata_version': '1.0'}

DOCUMENTATION = '''
---
module: kafka_consumer_group
short_description: Interact with kafka consumer groups
description:
- Interact with kafka consumer groups.
- Not compatible with Kafka version < 2.4.0.
author:
- Yassine MILHI
options:
topics:
descritption:
- 'consumed topics partitions on which action will be performed'
required: True
consumer_group:
description:
- 'one consumer group name.'
required: True
action:
description:
- 'action to apply (alter / delete).'
required: True
''' + DOCUMENTATION_COMMON

EXAMPLES = '''
- name: Delete offset for consumer group
kafka_consumer_group:
consumer_group: "{{ consumer_group | default('lolo_consumer')}}"
topics:
- name: lolo # mandatory
partitions: [0, 1, 2] # Optional
action: delete
bootstrap_servers: "{{ ansible_ssh_host }}:9094"
api_version: "{{ kafka_api_version }}"
sasl_mechanism: "PLAIN"
security_protocol: "SASL_SSL"
sasl_plain_username: "admin"
sasl_plain_password: "{{ kafka_admin_password }}"
ssl_check_hostname: False
ssl_cafile: "{{ kafka_cacert | default('/etc/ssl/certs/cacert.crt') }}"
'''


def main():
module = AnsibleModule(
argument_spec=dict(
consumer_group=dict(type='str', required=True),
topics=dict(
type='list',
elements='dict',
required=True,
options=dict(
name=dict(type='str', required=True),
partitions=dict(type='list', elements='int',
required=False)
)),
action=dict(type='str', required=True,
required_one_of=[('delete')]),

**module_commons
),
supports_check_mode=True
)

params = module.params
consumer_group = params['consumer_group']
topics = params['topics']
action = params['action']

try:
manager = get_manager_from_params(params)
api_version = parse_version(manager.get_api_version())
if(api_version < parse_version('2.4.0')):
module.fail_json(
msg='Delete offset API provided on kafka 2.4.0 (KIP-496)'
+ ' current version %s' % str(manager.get_api_version())
)
if(action == 'delete'):
changed = manager.delete_group_offset(consumer_group, topics,
module.check_mode)
except KafkaError:
e = get_exception()
module.fail_json(
msg='Error while deleting kafka consumer group offset: %s' % e
)
except Exception:
e = get_exception()
module.fail_json(
msg='Seomthing went wrong: %s ' % e
)
finally:
if manager:
manager.close()
maybe_clean_kafka_ssl_files(params)

if(changed):
msg = 'topics and partitions (%s) successfully deleted ' \
'for consumer group (%s)' % (topics, consumer_group)
else:
msg = 'nothing to do for consumer group %s and topics ' \
'partitions (%s)' % (consumer_group, topics)

module.exit_json(changed=changed, msg=msg)


if __name__ == '__main__':
main()
64 changes: 63 additions & 1 deletion module_utils/kafka_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import itertools
import json

from pkg_resources import parse_version
import time

Expand All @@ -8,7 +9,8 @@
ListPartitionReassignmentsRequest_v0,
DescribeConfigsRequest_v1,
DescribeClientQuotasRequest_v0,
AlterClientQuotasRequest_v0
AlterClientQuotasRequest_v0,
OffsetDeleteRequest_v0
)
from kafka.client_async import KafkaClient
from kazoo.client import KazooClient
Expand All @@ -27,9 +29,15 @@
CreatePartitionsRequest_v0,
AlterConfigsRequest_v0
)

from kafka.protocol.offset import (
OffsetRequest_v1
)

from kafka.protocol.commit import (
OffsetFetchRequest_v2, GroupCoordinatorRequest_v0
)

from kafka.protocol.group import MemberAssignment, ProtocolMetadata
import kafka.errors
from kafka.errors import IllegalArgumentError
Expand Down Expand Up @@ -183,6 +191,60 @@ def delete_topics(self, topics, timeout=None):
)
)

def get_coordinator_for_consumer_group(self, consumer_group=None):

# Identify which broker is coordinating this consumer group
response = self.send_request_and_get_response(
GroupCoordinatorRequest_v0(consumer_group)
)
if response.error_code != self.SUCCESS_CODE:
raise KafkaManagerError(
'Error getting coordinator for consumer group %s.' % (
consumer_group)
)
return response.coordinator_id

def delete_group_offset(self, group_id, topics, check_mode):

consumer_coordinator = self.get_coordinator_for_consumer_group(
group_id)

consumed_offsets = self.send_request_and_get_response(
OffsetFetchRequest_v2(group_id, None), consumer_coordinator)

consumed_topics = [e[0] for e in consumed_offsets.topics] # topic name

changed = False

topics_partitions = []
for topic in topics:
topic_name = topic['name']
partitons = topic.get('partitions')
if(partitons is None or not partitons):
partitons = list(self.get_partitions_for_topic(topic_name)
.keys())
topics_partitions.append((topic_name, partitons))
if(topic_name in consumed_topics):
changed = True

if changed:
request = OffsetDeleteRequest_v0(
group_id=group_id, topics=topics_partitions)

if not check_mode:
response = self.send_request_and_get_response(
request, consumer_coordinator)
if response.error_code != self.SUCCESS_CODE:
raise KafkaManagerError(
'Error while deleting consumer group %s.' % (
group_id)
)
return changed

@staticmethod
def _ensure_partitions_error_code_0(response):
raise KafkaManagerError(response.topics)

@staticmethod
def _convert_create_acls_resource_request_v0(acl_resource):
if acl_resource.operation == ACLOperation.ANY:
Expand Down
30 changes: 30 additions & 0 deletions module_utils/kafka_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,36 @@ class DescribeClientQuotasResponse_v0(Response):
)


class OffsetDeleteResponse_v0(Response):
API_KEY = 47
API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('throttle_time_ms', Int32),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16)))))
)


class OffsetDeleteRequest_v0(Request):
API_KEY = 47
API_VERSION = 0
RESPONSE_TYPE = OffsetDeleteResponse_v0
SCHEMA = Schema(
('group_id', String('utf-8')),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(Int32))
))
)


API_KEYS[47] = 'OffsetDelete'


class DescribeClientQuotasRequest_v0(Request):
API_KEY = 48
API_VERSION = 0
Expand Down
1 change: 1 addition & 0 deletions molecule/default/molecule.yml
Original file line number Diff line number Diff line change
Expand Up @@ -323,5 +323,6 @@ verifier:
n: "auto"
group: "${PYTEST_SPLIT_GROUP:-1}"
splits: "${PYTEST_SPLIT_SPLITS:-1}"
timeout: "${PYTEST_TIMEOUT_S:-600}"
lint:
name: flake8
Loading

0 comments on commit 291ba00

Please sign in to comment.