diff --git a/src/main/java/org/folio/circulation/EventConsumerVerticle.java b/src/main/java/org/folio/circulation/EventConsumerVerticle.java index 49cdf461f7..64e6516f57 100644 --- a/src/main/java/org/folio/circulation/EventConsumerVerticle.java +++ b/src/main/java/org/folio/circulation/EventConsumerVerticle.java @@ -112,6 +112,7 @@ private Future> createConsumer(DomainEventT return moduleIdProvider.getModuleId() + .onSuccess(moduleId -> log.info("createConsumer:: moduleId={}", moduleId)) .compose(moduleId -> consumer.start(handler, moduleId)) .map(consumer) .onSuccess(consumers::add); diff --git a/src/main/java/org/folio/circulation/services/events/UniqueKafkaModuleIdProvider.java b/src/main/java/org/folio/circulation/services/events/UniqueKafkaModuleIdProvider.java index 191659eb6c..f4925580ab 100644 --- a/src/main/java/org/folio/circulation/services/events/UniqueKafkaModuleIdProvider.java +++ b/src/main/java/org/folio/circulation/services/events/UniqueKafkaModuleIdProvider.java @@ -1,12 +1,15 @@ package org.folio.circulation.services.events; import static java.util.Comparator.comparing; +import static java.util.stream.Collectors.toMap; import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; import static org.folio.kafka.KafkaTopicNameHelper.formatGroupName; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.function.Consumer; +import java.util.function.UnaryOperator; import org.folio.circulation.domain.events.DomainEventType; import org.folio.kafka.KafkaConfig; @@ -16,7 +19,9 @@ import io.vertx.kafka.admin.ConsumerGroupDescription; import io.vertx.kafka.admin.ConsumerGroupListing; import io.vertx.kafka.admin.KafkaAdminClient; +import lombok.extern.log4j.Log4j2; +@Log4j2 public class UniqueKafkaModuleIdProvider implements ModuleIdProvider { private final KafkaAdminClient kafkaAdminClient; private final DomainEventType eventType; @@ -31,6 +36,8 @@ public UniqueKafkaModuleIdProvider(Vertx vertx, KafkaConfig kafkaConfig, DomainE @Override public Future getModuleId() { + log.info("getModuleId:: getting unique module ID: eventType={}", eventType); + return kafkaAdminClient.listConsumerGroups() .map(this::extractConsumerGroupIds) .compose(kafkaAdminClient::describeConsumerGroups) @@ -38,46 +45,74 @@ public Future getModuleId() { } private List extractConsumerGroupIds(List groups) { - return groups.stream() + log.debug("extractConsumerGroupIds:: groups={}", groups); + + List existingGroupIds = groups.stream() .map(ConsumerGroupListing::getGroupId) .filter(id -> id.startsWith(formatGroupName(eventType.name(), REAL_MODULE_ID))) .sorted() .toList(); + + log.info("extractConsumerGroupIds:: existing consumer groups: {}", existingGroupIds); + return existingGroupIds; } private String getUniqueModuleId(Map groups) { + log.debug("getUniqueModuleId:: groups={}", groups); + List sortedGroups = groups.values() .stream() .sorted(comparing(ConsumerGroupDescription::getGroupId)) .toList(); + log.info("getUniqueModuleId:: group sizes: {}", sortedGroups.stream() + .collect(toMap(ConsumerGroupDescription::getGroupId, group -> group.getMembers().size()))); + return sortedGroups.stream() .filter(group -> group.getMembers().isEmpty()) .findFirst() .map(ConsumerGroupDescription::getGroupId) + .map(peek(emptyGroupId -> log.info("getUniqueModuleId:: empty group found: {}", emptyGroupId))) .map(emptyGroupId -> emptyGroupId.substring(emptyGroupId.lastIndexOf(REAL_MODULE_ID))) .orElseGet(() -> findUniqueModuleId(sortedGroups)); } - private String findUniqueModuleId(List existingGroups) { - List existingGroupIds = existingGroups.stream() + private String findUniqueModuleId(List groups) { + log.debug("findUniqueModuleId:: groups={}", groups); + + List existingGroupIds = groups.stream() .map(ConsumerGroupDescription::getGroupId) .toList(); - // in case list of group IDs has gaps + log.info("findUniqueModuleId:: looking for gaps in list of group IDs: {}", existingGroupIds); + for (int i = 0; i < existingGroupIds.size(); i++) { String candidateModuleId = buildUniqueModuleId(i); String candidateGroupId = formatGroupName(eventType.name(), candidateModuleId); + log.debug("findUniqueModuleId:: checking group ID: {}", candidateGroupId); if (!existingGroupIds.get(i).equals(candidateGroupId)) { + log.info("findUniqueModuleId:: found a gap in list of group IDs, using module ID {}", + candidateModuleId); return candidateModuleId; } } - // list of group IDs has no gaps - return buildUniqueModuleId(existingGroups.size()); + + final String newModuleId = buildUniqueModuleId(groups.size()); + log.info("findUniqueModuleId:: found no gaps in list of group IDs, using module ID {}", + newModuleId); + + return newModuleId; } private static String buildUniqueModuleId(int subgroupOrdinal) { return String.format("%s-subgroup-%d", REAL_MODULE_ID, subgroupOrdinal); } + private static UnaryOperator peek(Consumer consumer) { + return x -> { + consumer.accept(x); + return x; + }; + } + }