Skip to content

Commit

Permalink
Be more fair when using preserve leader option (#129)
Browse files Browse the repository at this point in the history
* More fair when using preserve leader option
  • Loading branch information
ryarnyah authored Mar 2, 2022
1 parent 26f8905 commit e4db6f3
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 3 deletions.
12 changes: 10 additions & 2 deletions module_utils/kafka_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ def get_assignment_for_replica_factor_update(self, topics,
for node_id, _, _, _ in self.get_brokers():
all_replicas.append(node_id)
brokers_iterator = itertools.cycle(all_replicas)
overflow_nodes = []

for topic_name, options in topics.items():
partitions = []
Expand All @@ -752,9 +753,12 @@ def get_assignment_for_replica_factor_update(self, topics,
if preserve_leader:
partition_replica_factor -= 1
replicas.append(leader)
overflow_nodes.append(leader)
for _i in range(partition_replica_factor):
broker = next(brokers_iterator)
if preserve_leader and broker == leader:
while broker in overflow_nodes or broker in replicas:
if broker in overflow_nodes:
overflow_nodes.remove(broker)
broker = next(brokers_iterator)
replicas.append(broker)
current_assignment = topics_configuration[(
Expand Down Expand Up @@ -790,6 +794,7 @@ def get_assignment_for_replica_factor_update_with_zk(self, topics,
for node_id, _, _, _ in self.get_brokers():
all_replicas.append(node_id)
brokers_iterator = itertools.cycle(all_replicas)
overflow_nodes = []

for topic_name, options in topics.items():
replica_factor = options['replica_factor']
Expand All @@ -814,9 +819,12 @@ def get_assignment_for_replica_factor_update_with_zk(self, topics,
if preserve_leader:
partition_replica_factor -= 1
replicas.append(leader)
overflow_nodes.append(leader)
for _i in range(partition_replica_factor):
broker = next(brokers_iterator)
if preserve_leader and broker == leader:
while broker in overflow_nodes or broker in replicas:
if broker in overflow_nodes:
overflow_nodes.remove(broker)
broker = next(brokers_iterator)
replicas.append(broker)
current_assignment = topics_configuration[(
Expand Down
108 changes: 107 additions & 1 deletion molecule/default/tests/test_topic_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
ensure_kafka_topic_with_zk,
check_configured_topic,
host_protocol_version, ensure_idempotency,
ensure_kafka_topics
ensure_kafka_topics,
call_kafka_info
)

runner = testinfra.utils.ansible_runner.AnsibleRunner(
Expand Down Expand Up @@ -139,6 +140,111 @@ def test_update_replica_factor_force_reassign(host):
assert topic_name in result['changes']['topic_updated'], str(result)


def test_rebalance_all_partitions_preserve_leader(host):
"""
Check if can reassign all topics-partition with
leader conservation.
"""
# Given
test_topic_name = get_topic_name()
test_topic_configuration = topic_defaut_configuration.copy()
test_topic_configuration.update({
'partitions': 50,
'replica_factor': 2,
'preserve_leader': True
})
ensure_kafka_topic(
host,
test_topic_configuration,
test_topic_name
)
time.sleep(0.3)
topics_config = call_kafka_info(
host,
{
'resource': 'topic-config',
'include_internal': True,
'include_defaults': False
}
)
topics = call_kafka_info(
host,
{
'resource': 'topic',
'include_internal': True
}
)

all_topics = topics[0]['ansible_module_results']
all_topics_config = topics_config[0]['ansible_module_results']
test_topics = []
for topic_name, partitions in all_topics.items():
if topic_name == test_topic_name or topic_name.startswith('__'):
test_topics.append({
'name': topic_name,
'partitions': len(partitions),
'replica_factor': len(partitions['0']['replicas']),
'options': all_topics_config[topic_name],
'force_reassign': True,
'preserve_leader': True,
'state': 'present'
})
existing_leaders = []
for i, broker_topics in enumerate(topics):
existing_leaders.append({})
for topic_name, partitions in (
broker_topics['ansible_module_results'].items()):
if topic_name == test_topic_name or topic_name.startswith('__'):
for partition, config in partitions.items():
existing_leaders[i][(topic_name, partition)] = \
config['leader']
# When
test_topics_configuration = {
'topics': test_topics,
'kafka_sleep_time': 5,
'kafka_max_retries': 60
}
ensure_kafka_topics(
host,
test_topics_configuration
)
time.sleep(10)
# Then
topics = call_kafka_info(
host,
{
'resource': 'topic',
'include_internal': True
}
)
for i, broker_topics in enumerate(topics):
resulting_replicas = {}
for topic_name, partitions in (
topics[i]['ansible_module_results'].items()):
if topic_name == test_topic_name or topic_name.startswith('__'):
for partition, config in partitions.items():
resulting_replicas[(topic_name, partition)] = \
config['replicas']
for topic_partition, leader in existing_leaders[i].items():
assert leader in resulting_replicas[topic_partition]
total_partitions = 0
partitions_per_broker = {}
for topic_name, partitions in (
topics[i]['ansible_module_results'].items()):
if topic_name == test_topic_name or topic_name.startswith('__'):
for partition, config in partitions.items():
for replica in config['replicas']:
if replica not in partitions_per_broker:
partitions_per_broker[replica] = 0
partitions_per_broker[replica] += 1
total_partitions += 1
mean_partitions = int(
total_partitions * 1.0 / len(partitions_per_broker))
for node_id, partitions in partitions_per_broker.items():
assert partitions >= mean_partitions, partitions_per_broker
assert partitions <= (mean_partitions + 1), partitions_per_broker


def test_update_partitions(host):
"""
Check if can update partitions numbers
Expand Down

0 comments on commit e4db6f3

Please sign in to comment.