diff --git a/README.md b/README.md index 1e8f14e8..65edd59d 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,32 @@ Here some examples on how to use this library: flush.ms: 12345 state: "present" +# Force reassign on topics +- name: force reassign + kafka_topics: + api_version: "1.0.1" + zookeeper: "{{ hostvars['zookeeper']['ansible_eth0']['ipv4']['address'] }}:2181" + bootstrap_servers: "{{ hostvars['kafka1']['ansible_eth0']['ipv4']['address'] }}:9092,{{ hostvars['kafka2']['ansible_eth0']['ipv4']['address'] }}:9092" + topics: + - name: "test" + partitions: 2 + replica_factor: 1 + options: + retention.ms: 574930 + flush.ms: 12345 + state: "present" + force_reassign: True + preserve_leader: True + - name: "test2" + partitions: 2 + replica_factor: 1 + options: + retention.ms: 574930 + flush.ms: 12345 + state: "present" + force_reassign: True + preserve_leader: True + # creates a topic 'test' with provided configuation for plaintext configured Kafka and Zookeeper - name: create topic kafka_topic: diff --git a/library/kafka_topic.py b/library/kafka_topic.py index 6460eb8f..40ebd810 100644 --- a/library/kafka_topic.py +++ b/library/kafka_topic.py @@ -54,6 +54,15 @@ description: - 'when resource = topic, number of replica for the partitions of ' - 'this resource.' + force_reassign: + description: + - 'force reassign topic/partition between all the brokers.' + default: False + preserve_leader: + description: + - 'when reassign topic/partition try to preserve topic/partition' + - 'leader to limit downtime.' + default: False state: description: - 'state of the managed resource.' diff --git a/module_utils/kafka_lib_acl.py b/module_utils/kafka_lib_acl.py index e5f7502f..5f73bc72 100644 --- a/module_utils/kafka_lib_acl.py +++ b/module_utils/kafka_lib_acl.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import collections +import traceback from pkg_resources import parse_version @@ -44,6 +45,7 @@ def process_module_acls(module, params=None): changed = False msg = '' warn = None + changes = {} try: manager = get_manager_from_params(params) @@ -145,12 +147,18 @@ def process_module_acls(module, params=None): changed = True msg += ''.join(['acl %s successfully created. ' % acl for acl in acls_to_add]) + changes.update({ + 'acls_added': [acl.to_dict() for acl in acls_to_add] + }) if len(acls_to_delete) > 0: if not module.check_mode: manager.delete_acls(acls_to_delete, api_version) changed = True msg += ''.join(['acl %s successfully deleted. ' % acl for acl in acls_to_delete]) + changes.update({ + 'acls_deleted': [acl.to_dict() for acl in acls_to_delete] + }) except KafkaError: e = get_exception() module.fail_json( @@ -159,7 +167,8 @@ def process_module_acls(module, params=None): except Exception: e = get_exception() module.fail_json( - msg='Something went wrong: %s' % e + msg='Something went wrong: (%s) %s' % (e, traceback.format_exc(e)), + changes=changes ) finally: manager.close() @@ -172,4 +181,4 @@ def process_module_acls(module, params=None): if warn is not None: module.warn(warn) - module.exit_json(changed=changed, msg=msg) + module.exit_json(changed=changed, msg=msg, changes=changes) diff --git a/module_utils/kafka_lib_commons.py b/module_utils/kafka_lib_commons.py index ca4d7a94..cc8341c2 100644 --- a/module_utils/kafka_lib_commons.py +++ b/module_utils/kafka_lib_commons.py @@ -68,7 +68,7 @@ request_timeout_ms: description: - 'timeout for kafka client requests' - default: 30000 + default: 60000 connections_max_idle_ms: description: - 'close idle connections after' @@ -80,6 +80,10 @@ replica_factor=dict(type='int', required=False, default=0), + force_reassign=dict(type='bool', required=False, default=False), + + preserve_leader=dict(type='bool', required=False, default=False), + options=dict(required=False, type='dict', default={}), kafka_sleep_time=dict(type='int', required=False, default=5), @@ -211,7 +215,7 @@ sasl_kerberos_service_name=dict(type='str', required=False), - request_timeout_ms=dict(type='int', default=30000), + request_timeout_ms=dict(type='int', default=60000), connections_max_idle_ms=dict(type='int', default=540000) ) diff --git a/module_utils/kafka_lib_quotas.py b/module_utils/kafka_lib_quotas.py index 6f10332c..0b03b4d6 100644 --- a/module_utils/kafka_lib_quotas.py +++ b/module_utils/kafka_lib_quotas.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +import traceback from kafka.errors import KafkaError from ansible.module_utils.pycompat24 import get_exception @@ -95,7 +96,8 @@ def process_module_quotas(module, params=None): except KafkaError: e = get_exception() module.fail_json( - msg='Unable to initialize Kafka manager: %s' % e + msg='Something went wrong: (%s) %s' % (e, traceback.format_exc(e)), + changes=changes ) except Exception: e = get_exception() diff --git a/module_utils/kafka_lib_topic.py b/module_utils/kafka_lib_topic.py index e1f04289..ae680102 100644 --- a/module_utils/kafka_lib_topic.py +++ b/module_utils/kafka_lib_topic.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import collections +import traceback from kafka.errors import KafkaError @@ -32,6 +33,7 @@ def process_module_topics(module, params=None): changed = False msg = '' warn = None + changes = {} try: manager = get_manager_from_params(params) @@ -48,6 +50,9 @@ def process_module_topics(module, params=None): changed = True msg += ''.join(['topic %s successfully created. ' % topic['name'] for topic in topics_to_create]) + changes.update({ + 'topic_created': topics_to_create + }) topics_to_maybe_update = [ topic for topic in topics @@ -63,6 +68,10 @@ def process_module_topics(module, params=None): if changed: msg += ''.join(['topic %s successfully updated. ' % topic for topic in topics_changed]) + changes.update({ + 'topic_updated': topics_changed + }) + topics_to_delete = [ topic for topic in topics if (topic['state'] == 'absent' and @@ -82,6 +91,9 @@ def process_module_topics(module, params=None): changed = True msg += ''.join(['topic %s successfully deleted. ' % topic['name'] for topic in topics_to_delete]) + changes.update({ + 'topic_deleted': topics_to_delete + }) except KafkaError: e = get_exception() module.fail_json( @@ -90,7 +102,8 @@ def process_module_topics(module, params=None): except Exception: e = get_exception() module.fail_json( - msg='Something went wrong: %s' % e + msg='Something went wrong: (%s) %s' % (e, traceback.format_exc(e)), + changes=changes ) finally: manager.close() @@ -103,7 +116,7 @@ def process_module_topics(module, params=None): if warn is not None and len(warn) > 0: module.warn(warn) - module.exit_json(changed=changed, msg=msg) + module.exit_json(changed=changed, msg=msg, changes=changes) def process_module_topic(module): @@ -112,6 +125,8 @@ def process_module_topic(module): 'name': params['name'], 'partitions': params['partitions'], 'replica_factor': params['replica_factor'], + 'force_reassign': params['force_reassign'], + 'preserve_leader': params['preserve_leader'], 'state': params['state'], 'options': params['options'] }] diff --git a/module_utils/kafka_manager.py b/module_utils/kafka_manager.py index 02c2a24a..672be833 100644 --- a/module_utils/kafka_manager.py +++ b/module_utils/kafka_manager.py @@ -81,6 +81,7 @@ def __init__(self, **configs): self.zookeeper_max_retries = 5 self.kafka_sleep_time = 5 self.kafka_max_retries = 5 + self.request_timeout_ms = configs['request_timeout_ms'] self.client = KafkaClient(**configs) self.refresh() @@ -109,7 +110,8 @@ def close_zk_client(self): """ Closes Zookeeper client """ - self.zk_client.stop() + if self.zk_client is not None: + self.zk_client.stop() def close_kafka_client(self): """ @@ -137,13 +139,11 @@ def refresh(self): % fut.exception ) - def create_topics(self, topics, timeout=None): + def create_topics(self, topics): """ Creates a topic Usable for Kafka version >= 0.10.1 """ - if timeout is None: - timeout = self.DEFAULT_TIMEOUT request = CreateTopicsRequest_v0( create_topic_requests=[( topic['name'], @@ -153,7 +153,7 @@ def create_topics(self, topics, timeout=None): if 'replica_assignment' in topic else [], topic['options'].items() if 'options' in topic else [] ) for topic in topics], - timeout=timeout + timeout=self.request_timeout_ms ) response = self.send_request_and_get_response(request) @@ -167,17 +167,15 @@ def create_topics(self, topics, timeout=None): ) ) - def delete_topics(self, topics, timeout=None): + def delete_topics(self, topics): """ Deletes a topic Usable for Kafka version >= 0.10.1 Need to know which broker is controller for topic """ - if timeout is None: - timeout = self.DEFAULT_TIMEOUT request = DeleteTopicsRequest_v0(topics=[topic['name'] for topic in topics], - timeout=timeout) + timeout=self.request_timeout_ms) response = self.send_request_and_get_response(request) for topic, error_code in response.topic_error_codes: @@ -627,10 +625,12 @@ def is_topics_replication_need_update(self, topics): """ topics_need_update = [] - for topic_name, replica_factor in topics.items(): - for _id, part in self.get_partitions_for_topic(topic_name).items(): - _topic, _partition, _leader, replicas, _isr, _error = part - if len(replicas) != replica_factor: + for topic_name, options in topics.items(): + for _id, metadata in self.get_partitions_for_topic( + topic_name).items(): + _topic, _partition, _leader, replicas, _isr, _error = metadata + if (len(replicas) != options['replica_factor'] + or options['force_reassign']): topics_need_update.append(topic_name) return topics_need_update @@ -670,7 +670,7 @@ def update_topics_partitions(self, topics): request = CreatePartitionsRequest_v0( topic_partitions=topics_assignments, - timeout=self.DEFAULT_TIMEOUT, + timeout=self.request_timeout_ms, validate_only=False ) response = self.send_request_and_get_response(request) @@ -714,46 +714,69 @@ def update_topics_configuration(self, topics): ) self.refresh() - def get_assignment_for_replica_factor_update(self, topic_name, - replica_factor): + def get_assignment_for_replica_factor_update(self, topics, + topics_configuration): """ Generates a json assignment based on replica_factor given to update replicas for a topic. Uses all brokers available and distributes them as replicas using a round robin method. """ + assigments = [] all_replicas = [] partitions = [] + for node_id, _, _, _ in self.get_brokers(): + all_replicas.append(node_id) + brokers_iterator = itertools.cycle(all_replicas) - if replica_factor > self.get_total_brokers(): - raise KafkaManagerError( - 'Error while updating topic \'%s\' replication factor : ' - 'replication factor \'%s\' is more than available brokers ' - '\'%s\'' % ( - topic_name, - replica_factor, - self.get_total_brokers() - ) - ) - else: - for node_id, _, _, _ in self.get_brokers(): - all_replicas.append(node_id) - brokers_iterator = itertools.cycle(all_replicas) - for _, part in self.get_partitions_for_topic(topic_name).items(): - _, partition, _, _, _, _ = part - replicas = [] - for _i in range(replica_factor): - replicas.append(next(brokers_iterator)) - assign_tmp = ( - partition, - replicas, - {} + for topic_name, options in topics.items(): + replica_factor = options['replica_factor'] + preserve_leader = options['preserve_leader'] + if replica_factor > self.get_total_brokers(): + raise KafkaManagerError( + 'Error while updating topic \'%s\' replication factor : ' + 'replication factor \'%s\' is more than available brokers ' + '\'%s\'' % ( + topic_name, + replica_factor, + self.get_total_brokers() + ) ) - partitions.append(assign_tmp) + else: + for _, metadata in self.get_partitions_for_topic( + topic_name).items(): + _, partition, leader, _, _, _ = metadata + partition_replica_factor = replica_factor + replicas = [] + if preserve_leader: + partition_replica_factor -= 1 + replicas.append(leader) + for _i in range(partition_replica_factor): + broker = next(brokers_iterator) + if preserve_leader and broker == leader: + broker = next(brokers_iterator) + replicas.append(broker) + current_assignment = topics_configuration[( + topic_name, partition)] + sorted(replicas) + sorted(current_assignment) + if (topic_name, partition) in topics_configuration and \ + replicas != current_assignment: + assign_tmp = ( + partition, + replicas, + {} + ) + partitions.append(assign_tmp) - return [(topic_name, partitions, {})] + if len(partitions) > 0: + assigments.append((topic_name, partitions, {})) + if len(assigments) == 0: + return None + return assigments - def get_assignment_for_replica_factor_update_with_zk(self, topics): + def get_assignment_for_replica_factor_update_with_zk(self, topics, + topics_configuration): """ Generates a json assignment based on replica_factor given to update replicas for a topic. @@ -763,7 +786,14 @@ def get_assignment_for_replica_factor_update_with_zk(self, topics): all_replicas = [] assign = {'partitions': [], 'version': 1} - for topic_name, replica_factor in topics.items(): + for node_id, _, _, _ in self.get_brokers(): + all_replicas.append(node_id) + brokers_iterator = itertools.cycle(all_replicas) + + for topic_name, options in topics.items(): + replica_factor = options['replica_factor'] + preserve_leader = options['preserve_leader'] + if replica_factor > self.get_total_brokers(): raise KafkaManagerError( 'Error while updating topic \'%s\' replication factor : ' @@ -775,21 +805,34 @@ def get_assignment_for_replica_factor_update_with_zk(self, topics): ) ) else: - for node_id, _, _, _ in self.get_brokers(): - all_replicas.append(node_id) - brokers_iterator = itertools.cycle(all_replicas) - for _, part in self.get_partitions_for_topic( + for _, metadata in self.get_partitions_for_topic( topic_name).items(): - _, partition, _, _, _, _ = part - assign_tmp = { - 'topic': topic_name, - 'partition': partition, - 'replicas': [] - } - for _i in range(replica_factor): - assign_tmp['replicas'].append(next(brokers_iterator)) - assign['partitions'].append(assign_tmp) - + _, partition, leader, _, _, _ = metadata + partition_replica_factor = replica_factor + replicas = [] + if preserve_leader: + partition_replica_factor -= 1 + replicas.append(leader) + for _i in range(partition_replica_factor): + broker = next(brokers_iterator) + if preserve_leader and broker == leader: + broker = next(brokers_iterator) + replicas.append(broker) + current_assignment = topics_configuration[( + topic_name, partition)] + sorted(replicas) + sorted(current_assignment) + if (topic_name, partition) in topics_configuration and \ + replicas != current_assignment: + assign_tmp = { + 'topic': topic_name, + 'partition': partition, + 'replicas': replicas + } + assign['partitions'].append(assign_tmp) + + if len(assign['partitions']) == 0: + return None return json.dumps(assign, ensure_ascii=False).encode('utf-8') def get_assignment_for_partition_update(self, topic_name, partitions): @@ -829,7 +872,7 @@ def wait_for_partition_assignement(self): retries < self.kafka_max_retries ): request = ListPartitionReassignmentsRequest_v0( - timeout_ms=60000, + timeout_ms=self.request_timeout_ms, topics=None, tags={} ) @@ -895,31 +938,40 @@ def path = "/admin/reassign_partitions" -> and wait for its consumption if it is already present. Requires zk connection. """ + topics_configuration = {} + for topic in topics: + partitions = self.get_partitions_for_topic(topic) + for partition, metadata in partitions.items(): + _, _, _, replicas, _, _ = metadata + topics_configuration[(topic, partition)] = replicas if (parse_version(self.get_api_version()) >= parse_version('2.4.0')): - assign = [] - for name, replica_factor in topics.items(): - assign += self.get_assignment_for_replica_factor_update( - name, replica_factor - ) - request = AlterPartitionReassignmentsRequest_v0( - timeout_ms=60000, - topics=assign, - tags={} + assign = self.get_assignment_for_replica_factor_update( + topics, + topics_configuration ) - self.wait_for_partition_assignement() - self.send_request_and_get_response(request) - self.wait_for_partition_assignement() + if assign is not None: + request = AlterPartitionReassignmentsRequest_v0( + timeout_ms=self.request_timeout_ms, + topics=assign, + tags={} + ) + self.wait_for_partition_assignement() + self.send_request_and_get_response(request) + self.wait_for_partition_assignement() elif self.zk_configuration is not None: try: json_assignment = ( self.get_assignment_for_replica_factor_update_with_zk( - topics + topics, + topics_configuration ) ) - self.init_zk_client() - self.wait_for_znode_assignment() - self.zk_client.create(self.ZK_REASSIGN_NODE, json_assignment) - self.wait_for_znode_assignment() + if json_assignment is not None: + self.init_zk_client() + self.wait_for_znode_assignment() + self.zk_client.create(self.ZK_REASSIGN_NODE, + json_assignment) + self.wait_for_znode_assignment() finally: self.close_zk_client() else: @@ -1365,12 +1417,18 @@ def ensure_topics(self, topics): topics_replication_need_update = \ self.is_topics_replication_need_update({ - topic['name']: topic['replica_factor'] + topic['name']: { + 'replica_factor': topic['replica_factor'], + 'force_reassign': topic.get('force_reassign', False) + } for topic in topics }) if len(topics_replication_need_update) > 0: self.update_admin_assignments({ - topic['name']: topic['replica_factor'] + topic['name']: { + 'replica_factor': topic['replica_factor'], + 'preserve_leader': topic.get('preserve_leader', False) + } for topic in topics if (topic['name'] in topics_replication_need_update) }) diff --git a/molecule/default/tests/ansible_utils.py b/molecule/default/tests/ansible_utils.py index bf7711c6..7bdbd0be 100644 --- a/molecule/default/tests/ansible_utils.py +++ b/molecule/default/tests/ansible_utils.py @@ -196,11 +196,13 @@ def call_kafka_stat_lag( 'bootstrap_servers': env['kfk_addr'], } module_args.update(args) - results.append(host.ansible('kafka_stat_lag', - "{{ module_args }}", check=False, - extra_vars={ - 'module_args': module_args - })) + result = host.ansible('kafka_stat_lag', + "{{ module_args }}", check=False, + extra_vars={ + 'module_args': module_args + }) + result.update({'module_args': module_args, 'env': env}) + results.append(result) return results @@ -227,11 +229,14 @@ def call_kafka_info( 'bootstrap_servers': env['kfk_addr'], } module_args.update(args) - results.append(host.ansible('kafka_info', - "{{ module_args }}", check=False, - extra_vars={ - 'module_args': module_args - })) + result = host.ansible('kafka_info', + "{{ module_args }}", check=False, + extra_vars={ + 'module_args': module_args + }) + result.update({'module_args': module_args, 'env': env}) + results.append(result) + return results @@ -264,11 +269,13 @@ def call_kafka_lib( 'bootstrap_servers': env['kfk_addr'], } module_args.update(args) - results.append(host.ansible('kafka_lib', - "{{ module_args }}", check=check, - extra_vars={ - 'module_args': module_args - })) + result = host.ansible('kafka_lib', + "{{ module_args }}", check=check, + extra_vars={ + 'module_args': module_args + }) + result.update({'module_args': module_args, 'env': env}) + results.append(result) return results @@ -301,11 +308,13 @@ def call_kafka_topic_with_zk( 'bootstrap_servers': env['kfk_addr'], } module_args.update(args) - results.append(host.ansible('kafka_topic', - "{{ module_args }}", check=check, - extra_vars={ - 'module_args': module_args - })) + result = host.ansible('kafka_topic', + "{{ module_args }}", check=check, + extra_vars={ + 'module_args': module_args + }) + result.update({'module_args': module_args, 'env': env}) + results.append(result) return results @@ -339,11 +348,14 @@ def call_kafka_topic( 'bootstrap_servers': env['kfk_addr'], } module_args.update(args) - results.append(host.ansible('kafka_topic', - "{{ module_args }}", check=check, - extra_vars={ - 'module_args': module_args - })) + result = host.ansible('kafka_topic', + "{{ module_args }}", check=check, + extra_vars={ + 'module_args': module_args + }) + result.update({'module_args': module_args, 'env': env}) + results.append(result) + return results @@ -377,11 +389,14 @@ def call_kafka_topics( 'bootstrap_servers': env['kfk_addr'], } module_args.update(args) - results.append(host.ansible('kafka_topics', - "{{ module_args }}", check=check, - extra_vars={ - 'module_args': module_args - })) + result = host.ansible('kafka_topics', + "{{ module_args }}", check=check, + extra_vars={ + 'module_args': module_args + }) + result.update({'module_args': module_args, 'env': env}) + results.append(result) + return results @@ -420,11 +435,14 @@ def call_kafka_quotas( 'bootstrap_servers': env['kfk_addr'], } module_args.update(args) - results.append(host.ansible('kafka_quotas', - "{{ module_args }}", check=check, - extra_vars={ - 'module_args': module_args - })) + result = host.ansible('kafka_quotas', + "{{ module_args }}", check=check, + extra_vars={ + 'module_args': module_args + }) + result.update({'module_args': module_args, 'env': env}) + results.append(result) + return results @@ -453,8 +471,11 @@ def call_kafka_consumer_group( } module_args.update(args) module_args = "{{ %s }}" % json.dumps(module_args) - results.append(host.ansible('kafka_consumer_group', - module_args, check=check)) + result = host.ansible('kafka_consumer_group', + module_args, check=check) + result.update({'module_args': module_args, 'env': env}) + results.append(result) + return results @@ -486,11 +507,14 @@ def call_kafka_acl( 'bootstrap_servers': env['kfk_addr'], } module_args.update(args) - results.append(host.ansible('kafka_acl', - "{{ module_args }}", check=check, - extra_vars={ - 'module_args': module_args - })) + result = host.ansible('kafka_acl', + "{{ module_args }}", check=check, + extra_vars={ + 'module_args': module_args + }) + result.update({'module_args': module_args, 'env': env}) + results.append(result) + return results @@ -518,11 +542,14 @@ def call_kafka_acls( 'bootstrap_servers': env['kfk_addr'], } module_args.update(args) - results.append(host.ansible('kafka_acls', - "{{ module_args }}", check=check, - extra_vars={ - 'module_args': module_args - })) + result = host.ansible('kafka_acls', + "{{ module_args }}", check=check, + extra_vars={ + 'module_args': module_args + }) + result.update({'module_args': module_args, 'env': env}) + results.append(result) + return results diff --git a/molecule/default/tests/test_topic_default.py b/molecule/default/tests/test_topic_default.py index c5e0fae7..e105d26c 100644 --- a/molecule/default/tests/test_topic_default.py +++ b/molecule/default/tests/test_topic_default.py @@ -62,6 +62,81 @@ def test_update_replica_factor(host): topic_name, kfk_addr) +def test_update_replica_factor_preserve_leader(host): + """ + Check if can update replication factor + """ + # Given + topic_name = get_topic_name() + ensure_kafka_topic( + host, + topic_defaut_configuration, + topic_name + ) + time.sleep(0.3) + # When + test_topic_configuration = topic_defaut_configuration.copy() + test_topic_configuration.update({ + 'replica_factor': 2, + 'preserve_leader': True + }) + ensure_idempotency( + ensure_kafka_topic_with_zk, + host, + test_topic_configuration, + topic_name + ) + time.sleep(0.3) + # Then + for host, host_vars in kafka_hosts.items(): + kfk_addr = "%s:9092" % \ + host_vars['ansible_eth0']['ipv4']['address']['__ansible_unsafe'] + check_configured_topic(host, test_topic_configuration, + topic_name, kfk_addr) + + +def test_update_replica_factor_force_reassign(host): + """ + Check if can update replication factor + """ + # Given + topic_name = get_topic_name() + ensure_kafka_topic( + host, + topic_defaut_configuration, + topic_name + ) + time.sleep(0.3) + # When + test_topic_configuration = topic_defaut_configuration.copy() + test_topic_configuration.update({ + 'replica_factor': 2, + 'force_reassign': True, + 'preserve_leader': True + }) + ensure_kafka_topic_with_zk( + host, + test_topic_configuration, + topic_name + ) + time.sleep(0.3) + test_topic_configuration.update({ + 'replica_factor': 2, + 'force_reassign': True, + 'preserve_leader': True + }) + results = ensure_kafka_topic_with_zk( + host, + test_topic_configuration, + topic_name + ) + time.sleep(0.3) + # Then + for result in results: + assert result['changed'], str(result) + assert topic_name in result['changes']['topic_updated'], str(result) + + def test_update_partitions(host): """ Check if can update partitions numbers