Skip to content

Commit

Permalink
CIRC-1958: Handle circulation rules update events (#1371)
Browse files Browse the repository at this point in the history
* CIRC-1958 Handle circulation rules update events

* CIRC-1958 Enable debug logging

* CIRC-1958 Fix event json parsing

* CIRC-1958 Refactoring

* CIRC-1958 Minor fixes

* CIRC-1958 Convert event payload to POJO

* CIRC-1958 Fix tests

* CIRC-1958 Fix tests

* CIRC-1958 Fix tests

* CIRC-1958 Another test

* CIRC-1958 Fix code smells

* CIRC-1958 Minor adjustments

* CIRC-1958 Minor adjustments

* CIRC-1958 Minor adjustments

* CIRC-1958 Fix logging messages

* CIRC-1958 Logic overhaul

* CIRC-1958 Minor improvements

* CIRC-1958 Warm up cache when activating tenant

* CIRC-1958 Fix test

* CIRC-1958 Event validations

* CIRC-1958 Refresh cache on GET /circulation/rules

* CIRC-1958 Fix tests

* CIRC-1958 Improve coverage

* CIRC-1958 Remove redundant check

* CIRC-1958 Remove redundant cache warm-up

* CIRC-1958 Improve logging

* CIRC-1958 Revert changes made for debugging

(cherry picked from commit 85112f7)
  • Loading branch information
OleksandrVidinieiev committed Nov 20, 2023
1 parent 663a410 commit 8710717
Show file tree
Hide file tree
Showing 30 changed files with 1,069 additions and 162 deletions.
30 changes: 18 additions & 12 deletions descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -669,17 +669,6 @@
"version": "1.0",
"interfaceType": "system",
"handlers": [
{
"methods": [
"POST"
],
"pathPattern": "/circulation/rules-reload",
"modulePermissions": [
"circulation-storage.circulation-rules.get"
],
"unit": "minute",
"delay": "3"
},
{
"methods": [
"POST"
Expand Down Expand Up @@ -1106,7 +1095,8 @@
"modulePermissions": [
"pubsub.event-types.post",
"pubsub.publishers.post",
"pubsub.subscribers.post"
"pubsub.subscribers.post",
"circulation-storage.circulation-rules.get"
]
},
{
Expand Down Expand Up @@ -2364,6 +2354,22 @@
{
"name": "SCHEDULED_ANONYMIZATION_NUMBER_OF_LOANS_TO_CHECK",
"value": "50000"
},
{
"name": "KAFKA_HOST",
"value": "kafka"
},
{
"name": "KAFKA_PORT",
"value": "9092"
},
{
"name": "REPLICATION_FACTOR",
"value": "1"
},
{
"name": "ENV",
"value": "folio"
}
]
}
Expand Down
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,21 @@
<artifactId>log4j-slf4j2-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.folio</groupId>
<artifactId>folio-kafka-wrapper</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-kafka-client</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<distributionManagement>
Expand Down
9 changes: 3 additions & 6 deletions src/main/java/org/folio/circulation/CirculationVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.circulation.resources.AddInfoResource;
import org.folio.circulation.resources.AllowedServicePointsResource;
import org.folio.circulation.resources.ChangeDueDateResource;
import org.folio.circulation.resources.AddInfoResource;
import org.folio.circulation.resources.CheckInByBarcodeResource;
import org.folio.circulation.resources.CheckOutByBarcodeResource;
import org.folio.circulation.resources.CirculationRulesReloadResource;
import org.folio.circulation.resources.CirculationRulesResource;
import org.folio.circulation.resources.ClaimItemReturnedResource;
import org.folio.circulation.resources.DeclareClaimedReturnedItemAsMissingResource;
Expand Down Expand Up @@ -38,14 +37,14 @@
import org.folio.circulation.resources.RequestQueueResource;
import org.folio.circulation.resources.RequestScheduledNoticeProcessingResource;
import org.folio.circulation.resources.ScheduledAnonymizationProcessingResource;
import org.folio.circulation.resources.ScheduledDigitalRemindersProcessingResource;
import org.folio.circulation.resources.TenantActivationResource;
import org.folio.circulation.resources.agedtolost.ScheduledAgeToLostFeeChargingResource;
import org.folio.circulation.resources.agedtolost.ScheduledAgeToLostResource;
import org.folio.circulation.resources.handlers.FeeFineBalanceChangedHandlerResource;
import org.folio.circulation.resources.handlers.LoanRelatedFeeFineClosedHandlerResource;
import org.folio.circulation.resources.renewal.RenewByBarcodeResource;
import org.folio.circulation.resources.renewal.RenewByIdResource;
import org.folio.circulation.resources.ScheduledDigitalRemindersProcessingResource;
import org.folio.circulation.support.logging.LogHelper;
import org.folio.circulation.support.logging.Logging;

Expand Down Expand Up @@ -79,7 +78,7 @@ public void start(Promise<Void> startFuture) {
.handler(rc -> LogHelper.logRequest(rc, log));

new HealthResource().register(router);
new TenantActivationResource().register(router);
new TenantActivationResource(client).register(router);

new CheckOutByBarcodeResource("/circulation/check-out-by-barcode", client).register(router);
new CheckInByBarcodeResource(client).register(router);
Expand All @@ -103,8 +102,6 @@ public void start(Promise<Void> startFuture) {

new CirculationRulesResource("/circulation/rules", client)
.register(router);
new CirculationRulesReloadResource("/circulation/rules-reload", client)
.register(router);
new LoanCirculationRulesEngineResource(
"/circulation/rules/loan-policy",
"/circulation/rules/loan-policy-all", client)
Expand Down
156 changes: 156 additions & 0 deletions src/main/java/org/folio/circulation/EventConsumerVerticle.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package org.folio.circulation;

import static java.lang.System.getenv;
import static org.folio.circulation.domain.events.DomainEventType.CIRCULATION_RULES_UPDATED;
import static org.folio.circulation.support.kafka.KafkaConfigConstants.KAFKA_ENV;
import static org.folio.circulation.support.kafka.KafkaConfigConstants.KAFKA_HOST;
import static org.folio.circulation.support.kafka.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE;
import static org.folio.circulation.support.kafka.KafkaConfigConstants.KAFKA_PORT;
import static org.folio.circulation.support.kafka.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR;
import static org.folio.circulation.support.kafka.KafkaConfigConstants.OKAPI_URL;

import java.util.ArrayList;
import java.util.List;

import org.folio.circulation.domain.events.DomainEventType;
import org.folio.circulation.services.events.CirculationRulesUpdateEventHandler;
import org.folio.circulation.services.events.DefaultModuleIdProvider;
import org.folio.circulation.services.events.ModuleIdProvider;
import org.folio.circulation.services.events.UniqueKafkaModuleIdProvider;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.GlobalLoadSensor;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaConsumerWrapper;
import org.folio.kafka.SubscriptionDefinition;
import org.folio.kafka.services.KafkaEnvironmentProperties;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import lombok.extern.log4j.Log4j2;

@Log4j2
public class EventConsumerVerticle extends AbstractVerticle {

private static final int DEFAULT_LOAD_LIMIT = 5;
private static final String TENANT_ID_PATTERN = "\\w+";
private static final String DEFAULT_OKAPI_URL = "http://okapi:9130";
private static final int DEFAULT_KAFKA_MAX_REQUEST_SIZE = 4000000;

private final List<KafkaConsumerWrapper<String, String>> consumers = new ArrayList<>();
private KafkaConfig kafkaConfig;

@Override
public void init(Vertx vertx, Context context) {
super.init(vertx, context);
kafkaConfig = buildKafkaConfig();
}

@Override
public void start(Promise<Void> promise) {
log.info("start:: starting verticle");

createConsumers()
.onSuccess(v -> log.info("start:: verticle started"))
.onFailure(t -> log.error("start:: verticle start failed", t))
.onComplete(promise);
}

@Override
public void stop(Promise<Void> promise) {
log.info("stop:: stopping verticle");

stopConsumers()
.onSuccess(v -> log.info("stop:: verticle stopped"))
.onFailure(t -> log.error("stop:: verticle stop failed", t))
.onComplete(promise);
}

private Future<Void> stopConsumers() {
log.info("stopConsumers:: stopping consumers");
return Future.all(
consumers.stream()
.map(KafkaConsumerWrapper::stop)
.toList())
.onSuccess(v -> log.info("stopConsumers:: event consumers stopped"))
.onFailure(t -> log.error("stopConsumers:: failed to stop event consumers", t))
.mapEmpty();
}

private Future<Void> createConsumers() {
log.info("createConsumers:: creating consumers");
return Future.all(List.of(
createConsumer(CIRCULATION_RULES_UPDATED, new CirculationRulesUpdateEventHandler(),
// puts consumers into separate groups so that they all receive the same event
new UniqueKafkaModuleIdProvider(vertx, kafkaConfig, CIRCULATION_RULES_UPDATED))
)).mapEmpty();
}

private Future<KafkaConsumerWrapper<String, String>> createConsumer(DomainEventType eventType,
AsyncRecordHandler<String, String> handler) {

return createConsumer(eventType, handler, new DefaultModuleIdProvider());
}

private Future<KafkaConsumerWrapper<String, String>> createConsumer(DomainEventType eventType,
AsyncRecordHandler<String, String> handler, ModuleIdProvider moduleIdProvider) {

log.info("createConsumer:: creating consumer for event type {}", eventType);

KafkaConsumerWrapper<String, String> consumer = KafkaConsumerWrapper.<String, String>builder()
.context(context)
.vertx(vertx)
.kafkaConfig(kafkaConfig)
.loadLimit(DEFAULT_LOAD_LIMIT)
.globalLoadSensor(new GlobalLoadSensor())
.subscriptionDefinition(buildSubscriptionDefinition(eventType))
.processRecordErrorHandler((t, r) -> log.error("Failed to process event: {}", r, t))
.build();


return moduleIdProvider.getModuleId()
.compose(moduleId -> consumer.start(handler, moduleId))
.map(consumer)
.onSuccess(consumers::add);
}

private static SubscriptionDefinition buildSubscriptionDefinition(DomainEventType eventType) {
return SubscriptionDefinition.builder()
.eventType(eventType.name())
.subscriptionPattern(eventType.getKafkaTopic().fullTopicName(TENANT_ID_PATTERN))
.build();
}

private KafkaConfig buildKafkaConfig() {
log.info("buildKafkaConfig:: building Kafka config");
final JsonObject vertxConfig = config();

KafkaConfig config = KafkaConfig.builder()
.envId(vertxConfig.getString(KAFKA_ENV))
.kafkaHost(vertxConfig.getString(KAFKA_HOST))
.kafkaPort(vertxConfig.getString(KAFKA_PORT))
.okapiUrl(vertxConfig.getString(OKAPI_URL))
.replicationFactor(Integer.parseInt(vertxConfig.getString(KAFKA_REPLICATION_FACTOR, "1")))
.maxRequestSize(Integer.parseInt(vertxConfig.getString(KAFKA_MAX_REQUEST_SIZE, "10")))
.build();

log.info("buildKafkaConfig:: {}", config);
return config;
}

public static JsonObject buildConfig() {
log.info("buildConfig:: building config for {}", EventConsumerVerticle.class.getSimpleName());
return new JsonObject()
.put(KAFKA_HOST, KafkaEnvironmentProperties.host())
.put(KAFKA_PORT, KafkaEnvironmentProperties.port())
.put(KAFKA_REPLICATION_FACTOR, KafkaEnvironmentProperties.replicationFactor())
.put(KAFKA_ENV, KafkaEnvironmentProperties.environment())
.put(OKAPI_URL, getenv().getOrDefault(OKAPI_URL, DEFAULT_OKAPI_URL))
.put(KAFKA_MAX_REQUEST_SIZE, getenv().getOrDefault(KAFKA_MAX_REQUEST_SIZE,
String.valueOf(DEFAULT_KAFKA_MAX_REQUEST_SIZE)));
}

}
41 changes: 28 additions & 13 deletions src/main/java/org/folio/circulation/Launcher.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.folio.circulation;

import static org.folio.circulation.support.json.JsonPropertyWriter.write;

import java.lang.invoke.MethodHandles;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -13,18 +13,20 @@
import org.folio.circulation.support.VertxAssistant;
import org.folio.circulation.support.logging.Logging;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.json.JsonObject;

public class Launcher {
private final VertxAssistant vertxAssistant;
private final Logger log;
private String moduleDeploymentId;
private final Set<String> deploymentIds;

public Launcher(VertxAssistant vertxAssistant) {
Logging.initialiseFormat();

this.vertxAssistant = vertxAssistant;
this.log = LogManager.getLogger(MethodHandles.lookup().lookupClass());
this.deploymentIds = new HashSet<>();
}

public static void main(String[] args) throws
Expand All @@ -51,27 +53,40 @@ private void stop() {
}

public CompletableFuture<Void> undeploy() {
return vertxAssistant.undeployVerticle(moduleDeploymentId);
return undeployVerticles();
}

public CompletableFuture<Void> start(Integer port) {

if(port == null) {
throw new IllegalArgumentException("port should not be null");
}

vertxAssistant.start();

log.info("Server Starting");
log.info("start:: server starting");

return deployVerticle(CirculationVerticle.class, new JsonObject().put("port", port))
.thenAccept(result -> log.info("start:: server started"))
.thenCompose(v -> deployVerticle(EventConsumerVerticle.class, EventConsumerVerticle.buildConfig()));
}

JsonObject config = new JsonObject();
write(config, "port", port);
private CompletableFuture<Void> deployVerticle(Class<? extends AbstractVerticle> verticleClass,
JsonObject config) {

CompletableFuture<String> deployed =
vertxAssistant.deployVerticle(CirculationVerticle.class, config);
return vertxAssistant.deployVerticle(verticleClass, config)
.thenAccept(deploymentIds::add)
.thenAccept(r -> log.info("deployVerticle:: verticle deployed: {}", verticleClass.getSimpleName()))
.exceptionally(t -> {
log.error("deployVerticle:: deployment failed: {}", verticleClass.getSimpleName(), t);
return null;
});
}

return deployed
.thenApply(result -> moduleDeploymentId = result)
.thenAccept(result -> log.info("Server Started"));
private CompletableFuture<Void> undeployVerticles() {
return CompletableFuture.allOf(
deploymentIds.stream()
.map(vertxAssistant::undeployVerticle)
.toArray(CompletableFuture[]::new));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.folio.circulation.domain.events;

import org.folio.kafka.services.KafkaTopic;

import lombok.AllArgsConstructor;

@AllArgsConstructor
public enum CirculationStorageKafkaTopic implements KafkaTopic {
CIRCULATION_RULES("rules", 10);

private final String topic;
private final int partitions;

@Override
public String moduleName() {
return "circulation";
}

@Override
public String topicName() {
return topic;
}

@Override
public int numPartitions() {
return partitions;
}
}

Loading

0 comments on commit 8710717

Please sign in to comment.