Skip to content

Commit

Permalink
Feat/add kafka reassign partitions (#117)
Browse files Browse the repository at this point in the history
* Add possibility to force reassignment of topics partitions between newly added brokers

* Add traceback & changes for acls

* Add documentation on force_reassign & preserve_leader

* Use request timeout for all requests
  • Loading branch information
ryarnyah authored Dec 7, 2021
1 parent 012f62a commit 672ffc9
Show file tree
Hide file tree
Showing 9 changed files with 356 additions and 131 deletions.
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,32 @@ Here some examples on how to use this library:
flush.ms: 12345
state: "present"
# Force reassign on topics
- name: force reassign
kafka_topics:
api_version: "1.0.1"
zookeeper: "{{ hostvars['zookeeper']['ansible_eth0']['ipv4']['address'] }}:2181"
bootstrap_servers: "{{ hostvars['kafka1']['ansible_eth0']['ipv4']['address'] }}:9092,{{ hostvars['kafka2']['ansible_eth0']['ipv4']['address'] }}:9092"
topics:
- name: "test"
partitions: 2
replica_factor: 1
options:
retention.ms: 574930
flush.ms: 12345
state: "present"
force_reassign: True
preserve_leader: True
- name: "test2"
partitions: 2
replica_factor: 1
options:
retention.ms: 574930
flush.ms: 12345
state: "present"
force_reassign: True
preserve_leader: True
# creates a topic 'test' with provided configuation for plaintext configured Kafka and Zookeeper
- name: create topic
kafka_topic:
Expand Down
9 changes: 9 additions & 0 deletions library/kafka_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@
description:
- 'when resource = topic, number of replica for the partitions of '
- 'this resource.'
force_reassign:
description:
- 'force reassign topic/partition between all the brokers.'
default: False
preserve_leader:
description:
- 'when reassign topic/partition try to preserve topic/partition'
- 'leader to limit downtime.'
default: False
state:
description:
- 'state of the managed resource.'
Expand Down
13 changes: 11 additions & 2 deletions module_utils/kafka_lib_acl.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import collections
import traceback

from pkg_resources import parse_version

Expand Down Expand Up @@ -44,6 +45,7 @@ def process_module_acls(module, params=None):
changed = False
msg = ''
warn = None
changes = {}

try:
manager = get_manager_from_params(params)
Expand Down Expand Up @@ -145,12 +147,18 @@ def process_module_acls(module, params=None):
changed = True
msg += ''.join(['acl %s successfully created. ' %
acl for acl in acls_to_add])
changes.update({
'acls_added': [acl.to_dict() for acl in acls_to_add]
})
if len(acls_to_delete) > 0:
if not module.check_mode:
manager.delete_acls(acls_to_delete, api_version)
changed = True
msg += ''.join(['acl %s successfully deleted. ' %
acl for acl in acls_to_delete])
changes.update({
'acls_deleted': [acl.to_dict() for acl in acls_to_delete]
})
except KafkaError:
e = get_exception()
module.fail_json(
Expand All @@ -159,7 +167,8 @@ def process_module_acls(module, params=None):
except Exception:
e = get_exception()
module.fail_json(
msg='Something went wrong: %s' % e
msg='Something went wrong: (%s) %s' % (e, traceback.format_exc(e)),
changes=changes
)
finally:
manager.close()
Expand All @@ -172,4 +181,4 @@ def process_module_acls(module, params=None):
if warn is not None:
module.warn(warn)

module.exit_json(changed=changed, msg=msg)
module.exit_json(changed=changed, msg=msg, changes=changes)
8 changes: 6 additions & 2 deletions module_utils/kafka_lib_commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
request_timeout_ms:
description:
- 'timeout for kafka client requests'
default: 30000
default: 60000
connections_max_idle_ms:
description:
- 'close idle connections after'
Expand All @@ -80,6 +80,10 @@

replica_factor=dict(type='int', required=False, default=0),

force_reassign=dict(type='bool', required=False, default=False),

preserve_leader=dict(type='bool', required=False, default=False),

options=dict(required=False, type='dict', default={}),

kafka_sleep_time=dict(type='int', required=False, default=5),
Expand Down Expand Up @@ -211,7 +215,7 @@

sasl_kerberos_service_name=dict(type='str', required=False),

request_timeout_ms=dict(type='int', default=30000),
request_timeout_ms=dict(type='int', default=60000),

connections_max_idle_ms=dict(type='int', default=540000)
)
Expand Down
4 changes: 3 additions & 1 deletion module_utils/kafka_lib_quotas.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import traceback
from kafka.errors import KafkaError

from ansible.module_utils.pycompat24 import get_exception
Expand Down Expand Up @@ -95,7 +96,8 @@ def process_module_quotas(module, params=None):
except KafkaError:
e = get_exception()
module.fail_json(
msg='Unable to initialize Kafka manager: %s' % e
msg='Something went wrong: (%s) %s' % (e, traceback.format_exc(e)),
changes=changes
)
except Exception:
e = get_exception()
Expand Down
19 changes: 17 additions & 2 deletions module_utils/kafka_lib_topic.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import collections
import traceback

from kafka.errors import KafkaError

Expand Down Expand Up @@ -32,6 +33,7 @@ def process_module_topics(module, params=None):
changed = False
msg = ''
warn = None
changes = {}

try:
manager = get_manager_from_params(params)
Expand All @@ -48,6 +50,9 @@ def process_module_topics(module, params=None):
changed = True
msg += ''.join(['topic %s successfully created. ' %
topic['name'] for topic in topics_to_create])
changes.update({
'topic_created': topics_to_create
})

topics_to_maybe_update = [
topic for topic in topics
Expand All @@ -63,6 +68,10 @@ def process_module_topics(module, params=None):
if changed:
msg += ''.join(['topic %s successfully updated. ' %
topic for topic in topics_changed])
changes.update({
'topic_updated': topics_changed
})

topics_to_delete = [
topic for topic in topics
if (topic['state'] == 'absent' and
Expand All @@ -82,6 +91,9 @@ def process_module_topics(module, params=None):
changed = True
msg += ''.join(['topic %s successfully deleted. ' %
topic['name'] for topic in topics_to_delete])
changes.update({
'topic_deleted': topics_to_delete
})
except KafkaError:
e = get_exception()
module.fail_json(
Expand All @@ -90,7 +102,8 @@ def process_module_topics(module, params=None):
except Exception:
e = get_exception()
module.fail_json(
msg='Something went wrong: %s' % e
msg='Something went wrong: (%s) %s' % (e, traceback.format_exc(e)),
changes=changes
)
finally:
manager.close()
Expand All @@ -103,7 +116,7 @@ def process_module_topics(module, params=None):
if warn is not None and len(warn) > 0:
module.warn(warn)

module.exit_json(changed=changed, msg=msg)
module.exit_json(changed=changed, msg=msg, changes=changes)


def process_module_topic(module):
Expand All @@ -112,6 +125,8 @@ def process_module_topic(module):
'name': params['name'],
'partitions': params['partitions'],
'replica_factor': params['replica_factor'],
'force_reassign': params['force_reassign'],
'preserve_leader': params['preserve_leader'],
'state': params['state'],
'options': params['options']
}]
Expand Down
Loading

0 comments on commit 672ffc9

Please sign in to comment.