Skip to content

Commit

Permalink
Partition batched creations / alter / delete to support kraft batched…
Browse files Browse the repository at this point in the history
… operations
  • Loading branch information
Pierre MORVAN committed Oct 2, 2023
1 parent b80945c commit 86686e2
Showing 1 changed file with 91 additions and 68 deletions.
159 changes: 91 additions & 68 deletions module_utils/kafka_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
)


# Max entries that can be inserted inside one kraft entry
# @see QuorumController.java
KRAFT_MAX_OPERATIONS = 10000


class KafkaManager:
"""
A class used to interact with Kafka and Zookeeper
Expand Down Expand Up @@ -156,28 +161,30 @@ def create_topics(self, topics):
Creates a topic
Usable for Kafka version >= 0.10.1
"""
request = CreateTopicsRequest_v0(
requests = [CreateTopicsRequest_v0(
create_topic_requests=[(
topic['name'],
topic['partitions'],
topic['replica_factor'],
topic['replica_assignment']
if 'replica_assignment' in topic else [],
topic['options'].items() if 'options' in topic else []
) for topic in topics],
) for topic in partitioned_topics],
timeout=self.request_timeout_ms
)
response = self.send_request_and_get_response(request)
) for partitioned_topics in
self._list_partition_by(topics, KRAFT_MAX_OPERATIONS)]
for request in requests:
response = self.send_request_and_get_response(request)

for topic, error_code in response.topic_errors:
if error_code != self.SUCCESS_CODE:
raise KafkaManagerError(
'Error while creating topic %s. '
'Error key is %s, %s.' % (
topic, kafka.errors.for_code(error_code).message,
kafka.errors.for_code(error_code).description
for topic, error_code in response.topic_errors:
if error_code != self.SUCCESS_CODE:
raise KafkaManagerError(
'Error while creating topic %s. '
'Error key is %s, %s.' % (
topic, kafka.errors.for_code(error_code).message,
kafka.errors.for_code(error_code).description
)
)
)

def delete_topics(self, topics):
"""
Expand Down Expand Up @@ -414,52 +421,62 @@ def describe_acls(self, acl_resource, api_version):

return acl_list

def _list_partition_by(self, lst, size):
for i in range(0, len(lst), size):
yield list(itertools.islice(lst, i, i + size))

def create_acls(self, acl_resources, api_version):
"""Create a set of ACLs"""

if api_version < parse_version('2.0.0'):
request = CreateAclsRequest_v0(
requests = [CreateAclsRequest_v0(
creations=[self._convert_create_acls_resource_request_v0(
acl_resource) for acl_resource in acl_resources]
)
)]
else:
request = CreateAclsRequest_v1(
requests = [CreateAclsRequest_v1(
creations=[self._convert_create_acls_resource_request_v1(
acl_resource) for acl_resource in acl_resources]
)
response = self.send_request_and_get_response(request)
acl_resource) for acl_resource
in partitioned_acl_resources]
) for partitioned_acl_resources in
self._list_partition_by(acl_resources, KRAFT_MAX_OPERATIONS)]
for request in requests:
response = self.send_request_and_get_response(request)

for error_code, error_message in response.creation_responses:
if error_code != self.SUCCESS_CODE:
raise KafkaManagerError(
'Error while creating ACL %s. Error %s: %s.' % (
acl_resources, error_code, error_message
for error_code, error_message in response.creation_responses:
if error_code != self.SUCCESS_CODE:
raise KafkaManagerError(
'Error while creating ACL %s. Error %s: %s.' % (
acl_resources, error_code, error_message
)
)
)

def delete_acls(self, acl_resources, api_version):
"""Delete a set of ACLSs"""

if api_version < parse_version('2.0.0'):
request = DeleteAclsRequest_v0(
requests = [DeleteAclsRequest_v0(
filters=[self._convert_delete_acls_resource_request_v0(
acl_resource) for acl_resource in acl_resources]
)
)]
else:
request = DeleteAclsRequest_v1(
requests = [DeleteAclsRequest_v1(
filters=[self._convert_delete_acls_resource_request_v1(
acl_resource) for acl_resource in acl_resources]
)
acl_resource) for acl_resource in
partitioned_acl_resources]
) for partitioned_acl_resources in
self._list_partition_by(acl_resources, KRAFT_MAX_OPERATIONS)]

response = self.send_request_and_get_response(request)
for request in requests:
response = self.send_request_and_get_response(request)

for error_code, error_message, _ in response.filter_responses:
if error_code != self.SUCCESS_CODE:
raise KafkaManagerError(
'Error while deleting ACL %s. Error %s: %s.' % (
acl_resources, error_code, error_message
for error_code, error_message, _ in response.filter_responses:
if error_code != self.SUCCESS_CODE:
raise KafkaManagerError(
'Error while deleting ACL %s. Error %s: %s.' % (
acl_resources, error_code, error_message
)
)
)

def send_request_and_get_response(self, request, node_id=None):
"""
Expand Down Expand Up @@ -1613,32 +1630,36 @@ def _map_to_quota_resources(entries):
}
} for entry in entries]

@staticmethod
def _map_to_quota_request(entries):
return AlterClientQuotasRequest_v0(entries=[
(
[(
entity['entity_type'],
entity['entity_name']
) for entity in entry['entity']],
[(
key,
value,
True
) for key, value in entry['quotas_to_delete'].items()] +
[(
key,
value,
False
) for key, value in entry['quotas_to_alter'].items()] +
[(
key,
value,
False
) for key, value in entry['quotas_to_add'].items()]
)
for entry in entries
], validate_only=False)
def _map_to_quota_requests(self, entries):
return [AlterClientQuotasRequest_v0(entries=[
(
[(
entity['entity_type'],
entity['entity_name']
) for entity in entry['entity']],
[(
key,
value,
True
) for key, value in
entry['quotas_to_delete'].items()] +
[(
key,
value,
False
) for key, value in
entry['quotas_to_alter'].items()] +
[(
key,
value,
False
) for key, value in
entry['quotas_to_add'].items()]
)
for entry in partitioned_entries
], validate_only=False
) for partitioned_entries in
self._list_partition_by(entries, KRAFT_MAX_OPERATIONS)]

def describe_quotas(self):
if parse_version(self.get_api_version()) >= parse_version('2.6.0'):
Expand Down Expand Up @@ -1725,12 +1746,14 @@ def describe_quotas(self):

def alter_quotas(self, quotas):
if parse_version(self.get_api_version()) >= parse_version('2.6.0'):
request = self._map_to_quota_request(quotas)
response = self.send_request_and_get_response(request)
response_entries = response.to_object()['entries']
for response_entry in response_entries:
if response_entry['error_code'] != 0:
raise KafkaManagerError(response_entry['error_message'])
requests = self._map_to_quota_requests(quotas)
for request in requests:
response = self.send_request_and_get_response(request)
response_entries = response.to_object()['entries']
for response_entry in response_entries:
if response_entry['error_code'] != 0:
raise KafkaManagerError(
response_entry['error_message'])
else:
# Use zookeeper when kafka < 2.6.0
try:
Expand Down

0 comments on commit 86686e2

Please sign in to comment.