Skip to content

Commit

Permalink
CIRC-1958: Additional logging (#1378)
Browse files Browse the repository at this point in the history
* CIRC-1958 Logging in UniqueKafkaModuleIdProvider

* CIRC-1958 Logging in UniqueKafkaModuleIdProvider

(cherry picked from commit 7666b2f)
  • Loading branch information
OleksandrVidinieiev committed Nov 20, 2023
1 parent 8710717 commit 4b0df61
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ private Future<KafkaConsumerWrapper<String, String>> createConsumer(DomainEventT


return moduleIdProvider.getModuleId()
.onSuccess(moduleId -> log.info("createConsumer:: moduleId={}", moduleId))
.compose(moduleId -> consumer.start(handler, moduleId))
.map(consumer)
.onSuccess(consumers::add);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package org.folio.circulation.services.events;

import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toMap;
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.folio.kafka.KafkaTopicNameHelper.formatGroupName;

import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;

import org.folio.circulation.domain.events.DomainEventType;
import org.folio.kafka.KafkaConfig;
Expand All @@ -16,7 +19,9 @@
import io.vertx.kafka.admin.ConsumerGroupDescription;
import io.vertx.kafka.admin.ConsumerGroupListing;
import io.vertx.kafka.admin.KafkaAdminClient;
import lombok.extern.log4j.Log4j2;

@Log4j2
public class UniqueKafkaModuleIdProvider implements ModuleIdProvider {
private final KafkaAdminClient kafkaAdminClient;
private final DomainEventType eventType;
Expand All @@ -31,53 +36,83 @@ public UniqueKafkaModuleIdProvider(Vertx vertx, KafkaConfig kafkaConfig, DomainE

@Override
public Future<String> getModuleId() {
log.info("getModuleId:: getting unique module ID: eventType={}", eventType);

return kafkaAdminClient.listConsumerGroups()
.map(this::extractConsumerGroupIds)
.compose(kafkaAdminClient::describeConsumerGroups)
.map(this::getUniqueModuleId);
}

private List<String> extractConsumerGroupIds(List<ConsumerGroupListing> groups) {
return groups.stream()
log.debug("extractConsumerGroupIds:: groups={}", groups);

List<String> existingGroupIds = groups.stream()
.map(ConsumerGroupListing::getGroupId)
.filter(id -> id.startsWith(formatGroupName(eventType.name(), REAL_MODULE_ID)))
.sorted()
.toList();

log.info("extractConsumerGroupIds:: existing consumer groups: {}", existingGroupIds);
return existingGroupIds;
}

private String getUniqueModuleId(Map<String, ConsumerGroupDescription> groups) {
log.debug("getUniqueModuleId:: groups={}", groups);

List<ConsumerGroupDescription> sortedGroups = groups.values()
.stream()
.sorted(comparing(ConsumerGroupDescription::getGroupId))
.toList();

log.info("getUniqueModuleId:: group sizes: {}", sortedGroups.stream()
.collect(toMap(ConsumerGroupDescription::getGroupId, group -> group.getMembers().size())));

return sortedGroups.stream()
.filter(group -> group.getMembers().isEmpty())
.findFirst()
.map(ConsumerGroupDescription::getGroupId)
.map(peek(emptyGroupId -> log.info("getUniqueModuleId:: empty group found: {}", emptyGroupId)))
.map(emptyGroupId -> emptyGroupId.substring(emptyGroupId.lastIndexOf(REAL_MODULE_ID)))
.orElseGet(() -> findUniqueModuleId(sortedGroups));
}

private String findUniqueModuleId(List<ConsumerGroupDescription> existingGroups) {
List<String> existingGroupIds = existingGroups.stream()
private String findUniqueModuleId(List<ConsumerGroupDescription> groups) {
log.debug("findUniqueModuleId:: groups={}", groups);

List<String> existingGroupIds = groups.stream()
.map(ConsumerGroupDescription::getGroupId)
.toList();

// in case list of group IDs has gaps
log.info("findUniqueModuleId:: looking for gaps in list of group IDs: {}", existingGroupIds);

for (int i = 0; i < existingGroupIds.size(); i++) {
String candidateModuleId = buildUniqueModuleId(i);
String candidateGroupId = formatGroupName(eventType.name(), candidateModuleId);
log.debug("findUniqueModuleId:: checking group ID: {}", candidateGroupId);
if (!existingGroupIds.get(i).equals(candidateGroupId)) {
log.info("findUniqueModuleId:: found a gap in list of group IDs, using module ID {}",
candidateModuleId);
return candidateModuleId;
}
}
// list of group IDs has no gaps
return buildUniqueModuleId(existingGroups.size());

final String newModuleId = buildUniqueModuleId(groups.size());
log.info("findUniqueModuleId:: found no gaps in list of group IDs, using module ID {}",
newModuleId);

return newModuleId;
}

private static String buildUniqueModuleId(int subgroupOrdinal) {
return String.format("%s-subgroup-%d", REAL_MODULE_ID, subgroupOrdinal);
}

private static <T> UnaryOperator<T> peek(Consumer<T> consumer) {
return x -> {
consumer.accept(x);
return x;
};
}

}

0 comments on commit 4b0df61

Please sign in to comment.