diff --git a/module_utils/kafka_manager.py b/module_utils/kafka_manager.py index d02df789..0f91f26a 100644 --- a/module_utils/kafka_manager.py +++ b/module_utils/kafka_manager.py @@ -728,6 +728,7 @@ def get_assignment_for_replica_factor_update(self, topics, for node_id, _, _, _ in self.get_brokers(): all_replicas.append(node_id) brokers_iterator = itertools.cycle(all_replicas) + overflow_nodes = [] for topic_name, options in topics.items(): partitions = [] @@ -752,9 +753,12 @@ def get_assignment_for_replica_factor_update(self, topics, if preserve_leader: partition_replica_factor -= 1 replicas.append(leader) + overflow_nodes.append(leader) for _i in range(partition_replica_factor): broker = next(brokers_iterator) - if preserve_leader and broker == leader: + while broker in overflow_nodes or broker in replicas: + if broker in overflow_nodes: + overflow_nodes.remove(broker) broker = next(brokers_iterator) replicas.append(broker) current_assignment = topics_configuration[( @@ -790,6 +794,7 @@ def get_assignment_for_replica_factor_update_with_zk(self, topics, for node_id, _, _, _ in self.get_brokers(): all_replicas.append(node_id) brokers_iterator = itertools.cycle(all_replicas) + overflow_nodes = [] for topic_name, options in topics.items(): replica_factor = options['replica_factor'] @@ -814,9 +819,12 @@ def get_assignment_for_replica_factor_update_with_zk(self, topics, if preserve_leader: partition_replica_factor -= 1 replicas.append(leader) + overflow_nodes.append(leader) for _i in range(partition_replica_factor): broker = next(brokers_iterator) - if preserve_leader and broker == leader: + while broker in overflow_nodes or broker in replicas: + if broker in overflow_nodes: + overflow_nodes.remove(broker) broker = next(brokers_iterator) replicas.append(broker) current_assignment = topics_configuration[( diff --git a/molecule/default/tests/test_topic_default.py b/molecule/default/tests/test_topic_default.py index 3bc9f81f..307aafc0 100644 --- a/molecule/default/tests/test_topic_default.py +++ b/molecule/default/tests/test_topic_default.py @@ -14,7 +14,8 @@ ensure_kafka_topic_with_zk, check_configured_topic, host_protocol_version, ensure_idempotency, - ensure_kafka_topics + ensure_kafka_topics, + call_kafka_info ) runner = testinfra.utils.ansible_runner.AnsibleRunner( @@ -139,6 +140,111 @@ def test_update_replica_factor_force_reassign(host): assert topic_name in result['changes']['topic_updated'], str(result) +def test_rebalance_all_partitions_preserve_leader(host): + """ + Check if can reassign all topics-partition with + leader conservation. + """ + # Given + test_topic_name = get_topic_name() + test_topic_configuration = topic_defaut_configuration.copy() + test_topic_configuration.update({ + 'partitions': 50, + 'replica_factor': 2, + 'preserve_leader': True + }) + ensure_kafka_topic( + host, + test_topic_configuration, + test_topic_name + ) + time.sleep(0.3) + topics_config = call_kafka_info( + host, + { + 'resource': 'topic-config', + 'include_internal': True, + 'include_defaults': False + } + ) + topics = call_kafka_info( + host, + { + 'resource': 'topic', + 'include_internal': True + } + ) + + all_topics = topics[0]['ansible_module_results'] + all_topics_config = topics_config[0]['ansible_module_results'] + test_topics = [] + for topic_name, partitions in all_topics.items(): + if topic_name == test_topic_name or topic_name.startswith('__'): + test_topics.append({ + 'name': topic_name, + 'partitions': len(partitions), + 'replica_factor': len(partitions['0']['replicas']), + 'options': all_topics_config[topic_name], + 'force_reassign': True, + 'preserve_leader': True, + 'state': 'present' + }) + existing_leaders = [] + for i, broker_topics in enumerate(topics): + existing_leaders.append({}) + for topic_name, partitions in ( + broker_topics['ansible_module_results'].items()): + if topic_name == test_topic_name or topic_name.startswith('__'): + for partition, config in partitions.items(): + existing_leaders[i][(topic_name, partition)] = \ + config['leader'] + # When + test_topics_configuration = { + 'topics': test_topics, + 'kafka_sleep_time': 5, + 'kafka_max_retries': 60 + } + ensure_kafka_topics( + host, + test_topics_configuration + ) + time.sleep(10) + # Then + topics = call_kafka_info( + host, + { + 'resource': 'topic', + 'include_internal': True + } + ) + for i, broker_topics in enumerate(topics): + resulting_replicas = {} + for topic_name, partitions in ( + topics[i]['ansible_module_results'].items()): + if topic_name == test_topic_name or topic_name.startswith('__'): + for partition, config in partitions.items(): + resulting_replicas[(topic_name, partition)] = \ + config['replicas'] + for topic_partition, leader in existing_leaders[i].items(): + assert leader in resulting_replicas[topic_partition] + total_partitions = 0 + partitions_per_broker = {} + for topic_name, partitions in ( + topics[i]['ansible_module_results'].items()): + if topic_name == test_topic_name or topic_name.startswith('__'): + for partition, config in partitions.items(): + for replica in config['replicas']: + if replica not in partitions_per_broker: + partitions_per_broker[replica] = 0 + partitions_per_broker[replica] += 1 + total_partitions += 1 + mean_partitions = int( + total_partitions * 1.0 / len(partitions_per_broker)) + for node_id, partitions in partitions_per_broker.items(): + assert partitions >= mean_partitions, partitions_per_broker + assert partitions <= (mean_partitions + 1), partitions_per_broker + + def test_update_partitions(host): """ Check if can update partitions numbers