diff --git a/library/kafka_info.py b/library/kafka_info.py index de6f455c..9e893497 100644 --- a/library/kafka_info.py +++ b/library/kafka_info.py @@ -36,6 +36,16 @@ required: True choices: [topic, broker, consumer_group, acl, topic-config] + include_defaults: + description: + - 'Include defaults configuration when using topic-config resource' + required: False + default: True + include_internal: + description: + - 'Include internal topics when using topic or topic-config resource' + required: False + default: False ''' + DOCUMENTATION_COMMON EXAMPLES = ''' @@ -68,6 +78,8 @@ def main(): ], required=True ), + include_defaults=dict(type='bool', default=True), + include_internal=dict(type='bool', default=False), **module_commons ) ) @@ -78,7 +90,7 @@ def main(): try: manager = get_manager_from_params(params) - results = manager.get_resource(resource) + results = manager.get_resource(resource, params) except KafkaError: e = get_exception() module.fail_json( diff --git a/module_utils/kafka_manager.py b/module_utils/kafka_manager.py index 7a92ce77..d02df789 100644 --- a/module_utils/kafka_manager.py +++ b/module_utils/kafka_manager.py @@ -517,11 +517,12 @@ def get_config_for_topics(self, topics, include_defaults=False): topics_configs[resource_name] = current_config return topics_configs - def get_topics(self): + def get_topics(self, include_internal=True): """ Returns the topics list """ - return self.client.cluster.topics(exclude_internal_topics=False) + return self.client.cluster.topics( + exclude_internal_topics=(not include_internal)) def get_total_partitions_for_topic(self, topic): """ @@ -1041,7 +1042,7 @@ def generate_consumer_groups_for_broker(broker, response): consumer_groups[gid] = group return consumer_groups - def get_consumer_groups_resource(self): + def get_consumer_groups_resource(self, params): """ Return a dict object containing information about consumer groups and following this structure: @@ -1125,7 +1126,7 @@ def get_consumer_groups_resource(self): return consumer_groups - def get_brokers_resource(self): + def get_brokers_resource(self, params): """ Return a dict object containing information about brokers and following this structure: @@ -1149,7 +1150,7 @@ def get_brokers_resource(self): brokers[broker.nodeId] = broker._asdict() return brokers - def get_topics_config(self): + def get_topics_config(self, params): """ Return a dict object containing information about configuration of topics and partitions, and following this structure: @@ -1159,11 +1160,12 @@ def get_topics_config(self): } } """ - topics = self.get_topics() + topics = self.get_topics(params['include_internal']) return self.get_config_for_topics({ - topic: {} for topic in topics}) + topic: {} for topic in topics}, + include_defaults=params['include_defaults']) - def get_topics_resource(self): + def get_topics_resource(self, params): """ Return a dict object containing information about topics and partitions, and following this structure: @@ -1183,7 +1185,7 @@ def get_topics_resource(self): } } """ - all_topics = self.get_topics() + all_topics = self.get_topics(params['include_internal']) topics_configs = self.get_config_for_topics( {topic: dict() for topic in all_topics}, include_defaults=True) @@ -1275,7 +1277,7 @@ def get_topics_resource(self): ]['latest_offset'] = partition['offset'] return topics - def get_acls_resource(self): + def get_acls_resource(self, params): """ Return a dict object containing information about acls, following this structure: @@ -1372,11 +1374,11 @@ def resource_to_func(self): 'acl': self.get_acls_resource } - def get_resource(self, resource): + def get_resource(self, resource, params): if resource not in self.resource_to_func: raise ValueError('Unexpected resource "%s"' % resource) - return self.resource_to_func[resource]() + return self.resource_to_func[resource](params) def ensure_topics(self, topics): topics_changed = set() diff --git a/molecule/default/tests/test_default.py b/molecule/default/tests/test_default.py index 52907233..ccb5140d 100644 --- a/molecule/default/tests/test_default.py +++ b/molecule/default/tests/test_default.py @@ -515,6 +515,7 @@ def test_kafka_info_topic(host): # Then for r in results: assert topic_name in r['ansible_module_results'] + assert '__consumer_offsets' not in r['ansible_module_results'] for name, topic_info in r['ansible_module_results'].items(): for partition, partition_info in topic_info.items(): assert 'earliest_offset' in partition_info @@ -535,6 +536,48 @@ def test_kafka_info_topic(host): assert not partition_info['unavailable_partition'] +def test_kafka_info_topic_include_internal(host): + """ + Check if can get info on topic + """ + # Given + topic_name = get_topic_name() + ensure_topic( + host, + topic_defaut_configuration, + topic_name + ) + time.sleep(0.3) + + topic_test_name = get_topic_name() + topic_test_configuration = topic_defaut_configuration.copy() + topic_test_configuration.update({ + 'options': { + 'min.insync.replicas': 2 + } + }) + ensure_topic( + host, + topic_test_configuration, + topic_test_name + ) + time.sleep(0.3) + produce_and_consume_topic(topic_name, 10, get_consumer_group()) + time.sleep(0.3) + # When + results = call_kafka_info( + host, + { + 'resource': 'topic', + 'include_internal': True + } + ) + # Then + for r in results: + assert topic_name in r['ansible_module_results'] + assert '__consumer_offsets' in r['ansible_module_results'] + + def test_kafka_info_topics_config(host): """ Check if can get config on topic. @@ -566,6 +609,42 @@ def test_kafka_info_topics_config(host): for name, topic_config in r['ansible_module_results'].items(): if name == topic_name: assert int(topic_config['retention.ms']) == 66574936 + assert 'min.insync.replicas' in topic_config + + +def test_kafka_info_topics_config_not_include_defaults(host): + """ + Check if can get config on topic. + """ + # Given + topic_name = get_topic_name() + test_topic_configuration = topic_defaut_configuration.copy() + test_topic_configuration.update({ + 'options': { + 'retention.ms': 66574936 + } + }) + ensure_topic( + host, + test_topic_configuration, + topic_name + ) + time.sleep(0.3) + # When + results = call_kafka_info( + host, + { + 'resource': 'topic-config', + 'include_defaults': False + }, + ) + # Then + for r in results: + assert topic_name in r['ansible_module_results'] + for name, topic_config in r['ansible_module_results'].items(): + if name == topic_name: + assert int(topic_config['retention.ms']) == 66574936 + assert 'min.insync.replicas' not in topic_config def test_kafka_info_brokers(host):