Skip to content

Commit

Permalink
Add some options to kafka_info to manage retrocompatibility with exis…
Browse files Browse the repository at this point in the history
…ting module (#127)
  • Loading branch information
ryarnyah authored Feb 19, 2022
1 parent 0a7560b commit 26f8905
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 13 deletions.
14 changes: 13 additions & 1 deletion library/kafka_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@
required: True
choices: [topic, broker, consumer_group, acl,
topic-config]
include_defaults:
description:
- 'Include defaults configuration when using topic-config resource'
required: False
default: True
include_internal:
description:
- 'Include internal topics when using topic or topic-config resource'
required: False
default: False
''' + DOCUMENTATION_COMMON

EXAMPLES = '''
Expand Down Expand Up @@ -68,6 +78,8 @@ def main():
],
required=True
),
include_defaults=dict(type='bool', default=True),
include_internal=dict(type='bool', default=False),
**module_commons
)
)
Expand All @@ -78,7 +90,7 @@ def main():

try:
manager = get_manager_from_params(params)
results = manager.get_resource(resource)
results = manager.get_resource(resource, params)
except KafkaError:
e = get_exception()
module.fail_json(
Expand Down
26 changes: 14 additions & 12 deletions module_utils/kafka_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,11 +517,12 @@ def get_config_for_topics(self, topics, include_defaults=False):
topics_configs[resource_name] = current_config
return topics_configs

def get_topics(self):
def get_topics(self, include_internal=True):
"""
Returns the topics list
"""
return self.client.cluster.topics(exclude_internal_topics=False)
return self.client.cluster.topics(
exclude_internal_topics=(not include_internal))

def get_total_partitions_for_topic(self, topic):
"""
Expand Down Expand Up @@ -1041,7 +1042,7 @@ def generate_consumer_groups_for_broker(broker, response):
consumer_groups[gid] = group
return consumer_groups

def get_consumer_groups_resource(self):
def get_consumer_groups_resource(self, params):
"""
Return a dict object containing information about consumer groups and
following this structure:
Expand Down Expand Up @@ -1125,7 +1126,7 @@ def get_consumer_groups_resource(self):

return consumer_groups

def get_brokers_resource(self):
def get_brokers_resource(self, params):
"""
Return a dict object containing information about brokers and
following this structure:
Expand All @@ -1149,7 +1150,7 @@ def get_brokers_resource(self):
brokers[broker.nodeId] = broker._asdict()
return brokers

def get_topics_config(self):
def get_topics_config(self, params):
"""
Return a dict object containing information about configuration
of topics and partitions, and following this structure:
Expand All @@ -1159,11 +1160,12 @@ def get_topics_config(self):
}
}
"""
topics = self.get_topics()
topics = self.get_topics(params['include_internal'])
return self.get_config_for_topics({
topic: {} for topic in topics})
topic: {} for topic in topics},
include_defaults=params['include_defaults'])

def get_topics_resource(self):
def get_topics_resource(self, params):
"""
Return a dict object containing information about topics and partitions,
and following this structure:
Expand All @@ -1183,7 +1185,7 @@ def get_topics_resource(self):
}
}
"""
all_topics = self.get_topics()
all_topics = self.get_topics(params['include_internal'])
topics_configs = self.get_config_for_topics(
{topic: dict() for topic in all_topics}, include_defaults=True)

Expand Down Expand Up @@ -1275,7 +1277,7 @@ def get_topics_resource(self):
]['latest_offset'] = partition['offset']
return topics

def get_acls_resource(self):
def get_acls_resource(self, params):
"""
Return a dict object containing information about acls, following this
structure:
Expand Down Expand Up @@ -1372,11 +1374,11 @@ def resource_to_func(self):
'acl': self.get_acls_resource
}

def get_resource(self, resource):
def get_resource(self, resource, params):
if resource not in self.resource_to_func:
raise ValueError('Unexpected resource "%s"' % resource)

return self.resource_to_func[resource]()
return self.resource_to_func[resource](params)

def ensure_topics(self, topics):
topics_changed = set()
Expand Down
79 changes: 79 additions & 0 deletions molecule/default/tests/test_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ def test_kafka_info_topic(host):
# Then
for r in results:
assert topic_name in r['ansible_module_results']
assert '__consumer_offsets' not in r['ansible_module_results']
for name, topic_info in r['ansible_module_results'].items():
for partition, partition_info in topic_info.items():
assert 'earliest_offset' in partition_info
Expand All @@ -535,6 +536,48 @@ def test_kafka_info_topic(host):
assert not partition_info['unavailable_partition']


def test_kafka_info_topic_include_internal(host):
"""
Check if can get info on topic
"""
# Given
topic_name = get_topic_name()
ensure_topic(
host,
topic_defaut_configuration,
topic_name
)
time.sleep(0.3)

topic_test_name = get_topic_name()
topic_test_configuration = topic_defaut_configuration.copy()
topic_test_configuration.update({
'options': {
'min.insync.replicas': 2
}
})
ensure_topic(
host,
topic_test_configuration,
topic_test_name
)
time.sleep(0.3)
produce_and_consume_topic(topic_name, 10, get_consumer_group())
time.sleep(0.3)
# When
results = call_kafka_info(
host,
{
'resource': 'topic',
'include_internal': True
}
)
# Then
for r in results:
assert topic_name in r['ansible_module_results']
assert '__consumer_offsets' in r['ansible_module_results']


def test_kafka_info_topics_config(host):
"""
Check if can get config on topic.
Expand Down Expand Up @@ -566,6 +609,42 @@ def test_kafka_info_topics_config(host):
for name, topic_config in r['ansible_module_results'].items():
if name == topic_name:
assert int(topic_config['retention.ms']) == 66574936
assert 'min.insync.replicas' in topic_config


def test_kafka_info_topics_config_not_include_defaults(host):
"""
Check if can get config on topic.
"""
# Given
topic_name = get_topic_name()
test_topic_configuration = topic_defaut_configuration.copy()
test_topic_configuration.update({
'options': {
'retention.ms': 66574936
}
})
ensure_topic(
host,
test_topic_configuration,
topic_name
)
time.sleep(0.3)
# When
results = call_kafka_info(
host,
{
'resource': 'topic-config',
'include_defaults': False
},
)
# Then
for r in results:
assert topic_name in r['ansible_module_results']
for name, topic_config in r['ansible_module_results'].items():
if name == topic_name:
assert int(topic_config['retention.ms']) == 66574936
assert 'min.insync.replicas' not in topic_config


def test_kafka_info_brokers(host):
Expand Down

0 comments on commit 26f8905

Please sign in to comment.