diff --git a/descriptors/ModuleDescriptor-template.json b/descriptors/ModuleDescriptor-template.json
index d243dd63e..12109a5ec 100644
--- a/descriptors/ModuleDescriptor-template.json
+++ b/descriptors/ModuleDescriptor-template.json
@@ -1,6 +1,24 @@
{
"id": "${artifactId}-${version}",
"name": "Circulation Storage Module",
+ "requires": [
+ {
+ "id": "pubsub-event-types",
+ "version": "0.1"
+ },
+ {
+ "id": "pubsub-publishers",
+ "version": "0.1"
+ },
+ {
+ "id": "pubsub-subscribers",
+ "version": "0.1"
+ },
+ {
+ "id": "pubsub-publish",
+ "version": "0.1"
+ }
+ ],
"provides": [
{
"id": "request-storage-batch",
@@ -379,7 +397,12 @@
"handlers": [
{
"methods": ["POST"],
- "pathPattern": "/_/tenant"
+ "pathPattern": "/_/tenant",
+ "modulePermissions": [
+ "pubsub.event-types.post",
+ "pubsub.publishers.post",
+ "pubsub.subscribers.post"
+ ]
}, {
"methods": ["DELETE"],
"pathPattern": "/_/tenant"
@@ -449,6 +472,24 @@
]
}
]
+ },
+ {
+ "id": "_timer",
+ "version": "1.0",
+ "interfaceType": "system",
+ "handlers": [
+ {
+ "methods": [
+ "POST"
+ ],
+ "pathPattern": "/request-expiration/scheduled-request-expiration",
+ "modulePermissions": [
+ "pubsub.publish.post"
+ ],
+ "unit": "minute",
+ "delay": "2"
+ }
+ ]
}
],
"permissionSets": [
diff --git a/pom.xml b/pom.xml
index 8909af283..78840eb75 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,7 +77,6 @@
io.vertx
vertx-web-client
- test
org.glassfish.web
@@ -114,6 +113,35 @@
${rest-assured.version}
test
+
+ org.folio
+ mod-pubsub-client
+ 1.3.1
+
+
+ org.hamcrest
+ hamcrest-all
+
+
+
+
+ org.awaitility
+ awaitility
+ 4.0.3
+ test
+
+
+ com.github.tomakehurst
+ wiremock
+ 2.24.0
+
+
+ net.sf.jopt-simple
+ jopt-simple
+
+
+ test
+
diff --git a/src/main/java/org/folio/rest/impl/RequestExpiryImpl.java b/src/main/java/org/folio/rest/impl/RequestExpiryImpl.java
index 623c2973b..d929caaa1 100644
--- a/src/main/java/org/folio/rest/impl/RequestExpiryImpl.java
+++ b/src/main/java/org/folio/rest/impl/RequestExpiryImpl.java
@@ -1,24 +1,36 @@
+
package org.folio.rest.impl;
-import static org.folio.rest.RestVerticle.MODULE_SPECIFIC_ARGS;
+import static io.vertx.core.Future.succeededFuture;
+import static org.folio.rest.jaxrs.resource.ScheduledRequestExpiration.ScheduledRequestExpirationResponse.respond204;
+import static org.folio.rest.jaxrs.resource.ScheduledRequestExpiration.ScheduledRequestExpirationResponse.respond500WithTextPlain;
+import static org.folio.support.ExpirationTool.doRequestExpiration;
-import io.vertx.core.Context;
-import io.vertx.core.Vertx;
+import java.util.Map;
-import org.folio.rest.resource.interfaces.PeriodicAPI;
-import org.folio.support.ExpirationTool;
+import javax.ws.rs.core.Response;
-public class RequestExpiryImpl implements PeriodicAPI {
+import org.folio.rest.jaxrs.resource.ScheduledRequestExpiration;
- @Override
- public long runEvery() {
- String intervalString = MODULE_SPECIFIC_ARGS.getOrDefault("request.expire.interval",
- "120000");
- return Long.parseLong(intervalString);
- }
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Context;
+import io.vertx.core.Handler;
+
+public class RequestExpiryImpl implements ScheduledRequestExpiration {
@Override
- public void run(Vertx vertx, Context context) {
- context.runOnContext(v -> ExpirationTool.doRequestExpiration(vertx));
+ public void expireRequests(Map okapiHeaders,
+ Handler> asyncResultHandler, Context context) {
+
+ context.runOnContext(v -> doRequestExpiration(okapiHeaders, context.owner())
+ .onComplete(result -> {
+ if (result.succeeded()) {
+ asyncResultHandler.handle(succeededFuture(respond204()));
+ } else {
+ asyncResultHandler.handle(
+ succeededFuture(respond500WithTextPlain(result.cause().getMessage())));
+ }
+ })
+ );
}
}
diff --git a/src/main/java/org/folio/rest/impl/TenantRefAPI.java b/src/main/java/org/folio/rest/impl/TenantRefAPI.java
index ff928d93b..8986f13b0 100644
--- a/src/main/java/org/folio/rest/impl/TenantRefAPI.java
+++ b/src/main/java/org/folio/rest/impl/TenantRefAPI.java
@@ -10,6 +10,7 @@
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import org.folio.rest.tools.utils.TenantLoading;
+import org.folio.service.PubSubRegistrationService;
public class TenantRefAPI extends TenantAPI {
private static final Logger log = LoggerFactory.getLogger(TenantRefAPI.class);
@@ -44,11 +45,17 @@ public void postTenant(TenantAttributes ta, Map headers,
.respond500WithTextPlain(res1.cause().getLocalizedMessage())));
return;
}
- hndlr.handle(io.vertx.core.Future.succeededFuture(PostTenantResponse
- .respond201WithApplicationJson("")));
+ PubSubRegistrationService.registerModule(headers, vertx)
+ .whenComplete((aBoolean, throwable) ->
+ hndlr.handle(io.vertx.core.Future.succeededFuture(PostTenantResponse
+ .respond201WithApplicationJson(""))));
});
}, cntxt);
}
-
+ @Override
+ public void deleteTenant(Map headers, Handler> handlers, Context cntx) {
+ PubSubRegistrationService.unregisterModule(headers, cntx.owner())
+ .thenRun(() -> super.deleteTenant(headers, handlers, cntx));
+ }
}
diff --git a/src/main/java/org/folio/rest/jaxrs/resource/ScheduledRequestExpiration.java b/src/main/java/org/folio/rest/jaxrs/resource/ScheduledRequestExpiration.java
new file mode 100644
index 000000000..8124641d9
--- /dev/null
+++ b/src/main/java/org/folio/rest/jaxrs/resource/ScheduledRequestExpiration.java
@@ -0,0 +1,48 @@
+package org.folio.rest.jaxrs.resource;
+
+import java.util.Map;
+
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Response;
+
+import org.folio.rest.jaxrs.resource.support.ResponseDelegate;
+
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Context;
+import io.vertx.core.Handler;
+
+import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
+import static javax.ws.rs.core.MediaType.TEXT_PLAIN;
+
+@Path("/scheduled-request-expiration")
+public interface ScheduledRequestExpiration {
+
+ @POST
+ @Produces("text/plain")
+ void expireRequests(Map okapiHeaders,
+ Handler> asyncResultHandler, Context vertxContext);
+
+ class ScheduledRequestExpirationResponse extends ResponseDelegate {
+ private ScheduledRequestExpirationResponse(Response response, Object entity) {
+ super(response, entity);
+ }
+
+ private ScheduledRequestExpirationResponse(Response response) {
+ super(response);
+ }
+
+ public static ScheduledRequestExpirationResponse respond204() {
+ Response.ResponseBuilder responseBuilder = Response.status(204);
+ return new ScheduledRequestExpirationResponse(responseBuilder.build());
+ }
+
+ public static ScheduledRequestExpirationResponse respond500WithTextPlain(String reason) {
+ Response.ResponseBuilder responseBuilder = Response.status(500).header(CONTENT_TYPE, TEXT_PLAIN);
+ responseBuilder.entity(reason);
+
+ return new ScheduledRequestExpirationResponse(responseBuilder.build(), reason);
+ }
+ }
+}
diff --git a/src/main/java/org/folio/service/EventPublisherService.java b/src/main/java/org/folio/service/EventPublisherService.java
new file mode 100644
index 000000000..09a740dc9
--- /dev/null
+++ b/src/main/java/org/folio/service/EventPublisherService.java
@@ -0,0 +1,34 @@
+package org.folio.service;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.json.JsonObject;
+import org.folio.support.EventType;
+import org.folio.support.JsonPropertyWriter;
+import org.folio.support.LogEventPayloadField;
+import org.folio.support.exception.LogEventType;
+
+import java.util.Map;
+
+import static org.folio.support.JsonPropertyWriter.write;
+import static org.folio.support.LogEventPayloadField.LOG_EVENT_TYPE;
+
+public class EventPublisherService {
+
+ private final PubSubPublishingService pubSubPublishingService;
+
+ public EventPublisherService(Vertx vertx, Map okapiHeaders) {
+ pubSubPublishingService = new PubSubPublishingService(vertx, okapiHeaders);
+ }
+
+ public Future publishLogRecord(JsonObject context, LogEventType payloadType) {
+ context = new JsonObject().put(LogEventPayloadField.PAYLOAD.value(), context);
+ write(context, LOG_EVENT_TYPE.value(), payloadType.value());
+ Promise promise = Promise.promise();
+ pubSubPublishingService.publishEvent(EventType.LOG_RECORD.name(), context.encode())
+ .thenAccept(r -> promise.complete());
+ return promise.future();
+ }
+
+}
diff --git a/src/main/java/org/folio/service/PubSubPublishingService.java b/src/main/java/org/folio/service/PubSubPublishingService.java
new file mode 100644
index 000000000..9356b04e7
--- /dev/null
+++ b/src/main/java/org/folio/service/PubSubPublishingService.java
@@ -0,0 +1,67 @@
+package org.folio.service;
+
+import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
+import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
+import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+import org.folio.rest.jaxrs.model.Event;
+import org.folio.rest.jaxrs.model.EventMetadata;
+import org.folio.rest.util.OkapiConnectionParams;
+import org.folio.util.pubsub.PubSubClientUtils;
+
+import io.vertx.core.Context;
+import io.vertx.core.Vertx;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+
+class PubSubPublishingService {
+ private static final Logger logger = LoggerFactory.getLogger(PubSubPublishingService.class);
+
+ private final Map okapiHeaders;
+ private final Context context;
+
+ public PubSubPublishingService(Vertx vertx, Map okapiHeaders) {
+ context = vertx.getOrCreateContext();
+ this.okapiHeaders = okapiHeaders;
+ }
+
+ public CompletableFuture publishEvent(String eventType, String payload) {
+ Event event = new Event().withId(UUID.randomUUID()
+ .toString())
+ .withEventType(eventType)
+ .withEventPayload(payload)
+ .withEventMetadata(new EventMetadata().withPublishedBy(PubSubClientUtils.constructModuleName())
+ .withTenantId(okapiHeaders.get(OKAPI_TENANT_HEADER))
+ .withEventTTL(1));
+
+ final CompletableFuture publishResult = new CompletableFuture<>();
+ OkapiConnectionParams params = new OkapiConnectionParams();
+ params.setOkapiUrl(okapiHeaders.get(OKAPI_URL_HEADER));
+ params.setTenantId(okapiHeaders.get(OKAPI_TENANT_HEADER));
+ params.setToken(okapiHeaders.get(OKAPI_TOKEN_HEADER));
+
+ context.runOnContext(v -> PubSubClientUtils.sendEventMessage(event, params)
+ .whenComplete((result, throwable) -> {
+ if (Boolean.TRUE.equals(result)) {
+ logger.debug("Event published successfully. ID: {}, type: {}, payload: {}", event.getId(), event.getEventType(),
+ event.getEventPayload());
+ publishResult.complete(true);
+ } else {
+ logger.error("Failed to publish event. ID: {}, type: {}, payload: {}", throwable, event.getId(), event.getEventType(),
+ event.getEventPayload());
+
+ if (throwable == null) {
+ publishResult.complete(false);
+ } else {
+ publishResult.completeExceptionally(throwable);
+ }
+ }
+ }));
+
+ return publishResult;
+ }
+}
diff --git a/src/main/java/org/folio/service/PubSubRegistrationService.java b/src/main/java/org/folio/service/PubSubRegistrationService.java
new file mode 100644
index 000000000..794cd0adf
--- /dev/null
+++ b/src/main/java/org/folio/service/PubSubRegistrationService.java
@@ -0,0 +1,71 @@
+package org.folio.service;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static org.folio.HttpStatus.HTTP_NO_CONTENT;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.folio.rest.client.PubsubClient;
+import org.folio.rest.util.OkapiConnectionParams;
+import org.folio.support.EventType;
+import org.folio.support.exception.ModulePubSubUnregisteringException;
+import org.folio.util.pubsub.PubSubClientUtils;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+
+public class PubSubRegistrationService {
+ private static final Logger logger = LoggerFactory.getLogger(PubSubRegistrationService.class);
+
+ private PubSubRegistrationService() {
+ throw new IllegalStateException();
+ }
+
+ public static CompletableFuture registerModule(Map headers, Vertx vertx) {
+
+ return PubSubClientUtils.registerModule(new OkapiConnectionParams(headers, vertx))
+ .whenComplete((registrationAr, throwable) -> {
+ if (throwable == null) {
+ logger.info("Module was successfully registered as publisher/subscriber in mod-pubsub");
+ } else {
+ logger.error("Error during module registration in mod-pubsub", throwable);
+ }
+ });
+ }
+
+ public static CompletableFuture unregisterModule(Map headers, Vertx vertx) {
+
+ List> list = new ArrayList<>();
+
+ OkapiConnectionParams params = new OkapiConnectionParams(headers, vertx);
+ PubsubClient client = new PubsubClient(params.getOkapiUrl(), params.getTenantId(), params.getToken());
+
+ try {
+ for (EventType eventType : EventType.values()) {
+ CompletableFuture future = new CompletableFuture<>();
+ client.deletePubsubEventTypesPublishersByEventTypeName(eventType.name(), PubSubClientUtils.constructModuleName(), ar -> {
+ if (ar.statusCode() == HTTP_NO_CONTENT.toInt()) {
+ future.complete(true);
+ } else {
+ ModulePubSubUnregisteringException exception = new ModulePubSubUnregisteringException(
+ String.format("Module's publisher for event type %s was not unregistered from PubSub. HTTP status: %s",
+ eventType.name(), ar.statusCode()));
+ logger.error(exception);
+ future.completeExceptionally(exception);
+ }
+ });
+ list.add(future);
+ }
+ } catch (Exception exception) {
+ logger.error("Module's publishers were not unregistered from PubSub.", exception);
+ }
+
+ return allOf(list.toArray(new CompletableFuture[0])).thenApply(r -> true)
+ .whenComplete((r, e) -> client.close());
+ }
+
+}
diff --git a/src/main/java/org/folio/support/EventType.java b/src/main/java/org/folio/support/EventType.java
new file mode 100644
index 000000000..c46c46973
--- /dev/null
+++ b/src/main/java/org/folio/support/EventType.java
@@ -0,0 +1,6 @@
+
+package org.folio.support;
+
+public enum EventType {
+ LOG_RECORD
+}
diff --git a/src/main/java/org/folio/support/ExpirationTool.java b/src/main/java/org/folio/support/ExpirationTool.java
index 664ab64e0..0a86a31d0 100644
--- a/src/main/java/org/folio/support/ExpirationTool.java
+++ b/src/main/java/org/folio/support/ExpirationTool.java
@@ -12,10 +12,15 @@
import static org.folio.rest.jaxrs.model.Request.Status.OPEN_AWAITING_PICKUP;
import static org.folio.rest.jaxrs.model.Request.Status.OPEN_IN_TRANSIT;
import static org.folio.rest.jaxrs.model.Request.Status.OPEN_NOT_YET_FILLED;
+import static org.folio.support.LogEventPayloadField.ORIGINAL;
+import static org.folio.support.LogEventPayloadField.REQUESTS;
+import static org.folio.support.LogEventPayloadField.UPDATED;
+import static org.folio.support.exception.LogEventType.REQUEST_EXPIRED;
import static org.folio.support.DbUtil.rowSetToStream;
import java.text.SimpleDateFormat;
import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
@@ -40,6 +45,7 @@
import io.vertx.core.logging.LoggerFactory;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
+import org.folio.service.EventPublisherService;
public class ExpirationTool {
private static final Logger log = LoggerFactory.getLogger(ExpirationTool.class);
@@ -49,7 +55,7 @@ private ExpirationTool() {
//do nothing
}
- public static Future doRequestExpiration(Vertx vertx) {
+ public static Future doRequestExpiration(Map okapiHeaders, Vertx vertx) {
final Promise> promise = Promise.promise();
PostgresClient pgClient = PostgresClient.getInstance(vertx);
@@ -59,18 +65,20 @@ public static Future doRequestExpiration(Vertx vertx) {
return promise.future()
.compose(rs -> CompositeFuture.all(rowSetToStream(rs)
- .map(row -> doRequestExpirationForTenant(vertx, getTenant(row.getString("nspname"))))
+ .map(row -> doRequestExpirationForTenant(okapiHeaders, vertx, getTenant(row.getString("nspname"))))
.collect(toList()))
.map(all -> null));
}
- private static Future doRequestExpirationForTenant(Vertx vertx, String tenant) {
+ private static Future doRequestExpirationForTenant(Map okapiHeaders, Vertx vertx, String tenant) {
Promise promise = Promise.promise();
PostgresClient pgClient = PostgresClient.getInstance(vertx, tenant);
+ List context = new ArrayList<>();
+
pgClient.startTx(conn -> getExpiredRequests(conn, vertx, tenant)
- .compose(requests -> closeRequests(conn, vertx, tenant, requests))
+ .compose(requests -> closeRequests(conn, vertx, tenant, requests, context))
.compose(itemIds -> getOpenRequestsByItemIds(conn, vertx, tenant, itemIds))
.compose(requests -> reorderRequests(conn, vertx, tenant, requests))
.onComplete(v -> {
@@ -80,6 +88,9 @@ private static Future doRequestExpirationForTenant(Vertx vertx, String ten
promise.fail(v.cause());
});
} else {
+ EventPublisherService eventPublisherService = new EventPublisherService(vertx, okapiHeaders);
+ context.forEach(p -> eventPublisherService
+ .publishLogRecord(new JsonObject().put(REQUESTS.value(), p), REQUEST_EXPIRED));
pgClient.endTx(conn, done -> promise.complete());
}
}));
@@ -212,14 +223,18 @@ private static Future updateRequestsPositions(AsyncResult c
private static Future> closeRequests(AsyncResult conn,
- Vertx vertx, String tenant, List requests) {
+ Vertx vertx, String tenant, List requests, List context) {
Future future = succeededFuture();
Set closedRequestsItemIds = new HashSet<>();
for (Request request : requests) {
+ JsonObject pair = new JsonObject();
+ pair.put(ORIGINAL.value(), JsonObject.mapFrom(request));
closedRequestsItemIds.add(request.getItemId());
Request updatedRequest = changeRequestStatus(request).withPosition(null);
+ pair.put(UPDATED.value(), JsonObject.mapFrom(updatedRequest));
+ context.add(pair);
future = future.compose(v -> updateRequest(conn, vertx, tenant, updatedRequest));
}
diff --git a/src/main/java/org/folio/support/JsonPropertyWriter.java b/src/main/java/org/folio/support/JsonPropertyWriter.java
new file mode 100644
index 000000000..79b4ecb38
--- /dev/null
+++ b/src/main/java/org/folio/support/JsonPropertyWriter.java
@@ -0,0 +1,18 @@
+package org.folio.support;
+
+import org.apache.commons.lang3.StringUtils;
+
+import io.vertx.core.json.JsonObject;
+
+public class JsonPropertyWriter {
+
+ private JsonPropertyWriter() {
+ }
+
+ public static void write(JsonObject to, String propertyName, String value) {
+
+ if (StringUtils.isNotBlank(value)) {
+ to.put(propertyName, value);
+ }
+ }
+}
diff --git a/src/main/java/org/folio/support/LogEventPayloadField.java b/src/main/java/org/folio/support/LogEventPayloadField.java
new file mode 100644
index 000000000..3496a7fef
--- /dev/null
+++ b/src/main/java/org/folio/support/LogEventPayloadField.java
@@ -0,0 +1,20 @@
+package org.folio.support;
+
+public enum LogEventPayloadField {
+
+ PAYLOAD("payload"),
+ LOG_EVENT_TYPE("logEventType"),
+ REQUESTS("requests"),
+ ORIGINAL("original"),
+ UPDATED("updated");
+
+ private final String value;
+
+ LogEventPayloadField(String value) {
+ this.value = value;
+ }
+
+ public String value() {
+ return this.value;
+ }
+}
diff --git a/src/main/java/org/folio/support/exception/LogEventType.java b/src/main/java/org/folio/support/exception/LogEventType.java
new file mode 100644
index 000000000..5fc7d3a2c
--- /dev/null
+++ b/src/main/java/org/folio/support/exception/LogEventType.java
@@ -0,0 +1,15 @@
+package org.folio.support.exception;
+
+public enum LogEventType {
+ REQUEST_EXPIRED("REQUEST_EXPIRED_EVENT");
+
+ private final String value;
+
+ LogEventType(String value) {
+ this.value = value;
+ }
+
+ public String value() {
+ return this.value;
+ }
+}
diff --git a/src/main/java/org/folio/support/exception/ModulePubSubUnregisteringException.java b/src/main/java/org/folio/support/exception/ModulePubSubUnregisteringException.java
new file mode 100644
index 000000000..48de8a226
--- /dev/null
+++ b/src/main/java/org/folio/support/exception/ModulePubSubUnregisteringException.java
@@ -0,0 +1,7 @@
+package org.folio.support.exception;
+
+public class ModulePubSubUnregisteringException extends Exception {
+ public ModulePubSubUnregisteringException(String s) {
+ super(s);
+ }
+}
diff --git a/src/main/resources/MessagingDescriptor.json b/src/main/resources/MessagingDescriptor.json
new file mode 100644
index 000000000..b1981c677
--- /dev/null
+++ b/src/main/resources/MessagingDescriptor.json
@@ -0,0 +1,12 @@
+{
+ "publications": [
+ {
+ "eventType": "LOG_RECORD",
+ "description": "Created log record event",
+ "eventTTL": 1,
+ "signed": false
+ }
+ ],
+ "subscriptions": [
+ ]
+}
diff --git a/src/test/java/org/folio/rest/api/JsonPropertyWriterTest.java b/src/test/java/org/folio/rest/api/JsonPropertyWriterTest.java
new file mode 100644
index 000000000..f5c5860a7
--- /dev/null
+++ b/src/test/java/org/folio/rest/api/JsonPropertyWriterTest.java
@@ -0,0 +1,25 @@
+package org.folio.rest.api;
+
+import io.vertx.core.json.JsonObject;
+import org.folio.rest.support.ApiTests;
+import org.junit.Test;
+
+import static org.folio.support.JsonPropertyWriter.write;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class JsonPropertyWriterTest extends ApiTests {
+ @Test
+ public void writeExistingValue() {
+ JsonObject jsonObject = new JsonObject();
+ write(jsonObject, "key", "value");
+ assertThat(jsonObject.size(), is(1));
+ }
+
+ @Test
+ public void writeNullValue() {
+ JsonObject jsonObject = new JsonObject();
+ write(jsonObject, "key", null);
+ assertThat(jsonObject.size(), is(0));
+ }
+}
diff --git a/src/test/java/org/folio/rest/api/LoanPoliciesApiTest.java b/src/test/java/org/folio/rest/api/LoanPoliciesApiTest.java
index 90e234ca9..7a113927a 100644
--- a/src/test/java/org/folio/rest/api/LoanPoliciesApiTest.java
+++ b/src/test/java/org/folio/rest/api/LoanPoliciesApiTest.java
@@ -1,5 +1,6 @@
package org.folio.rest.api;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.folio.rest.support.builders.LoanPolicyRequestBuilder.defaultRollingPolicy;
import static org.folio.rest.support.builders.LoanPolicyRequestBuilder.emptyPolicy;
import static org.folio.rest.support.matchers.periodJsonObjectMatcher.matchesPeriod;
@@ -17,6 +18,7 @@
import static org.hamcrest.core.IsNull.notNullValue;
import java.net.MalformedURLException;
import java.net.URL;
+import java.net.URLEncoder;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -713,15 +715,15 @@ public void canSearchForLoanPolicyById()
String firstPolicyId = createLoanPolicy(defaultRollingPolicy().create()).getId();
String secondPolicyId = createLoanPolicy(defaultRollingPolicy().create()).getId();
- String queryTemplate = loanPolicyStorageUrl() + "?query=id=\"%s\"";
+ String queryTemplate = "id=\"%s\"";
CompletableFuture searchForFirstPolicyCompleted = new CompletableFuture<>();
CompletableFuture searchForSecondPolicyCompleted = new CompletableFuture<>();
- client.get(String.format(queryTemplate, firstPolicyId),
+ client.get(loanPolicyStorageUrl() + "?query=" + URLEncoder.encode(String.format(queryTemplate, firstPolicyId), UTF_8),
StorageTestSuite.TENANT_ID, ResponseHandler.json(searchForFirstPolicyCompleted));
- client.get(String.format(queryTemplate, secondPolicyId),
+ client.get(loanPolicyStorageUrl() + "?query=" + URLEncoder.encode(String.format(queryTemplate, secondPolicyId), UTF_8),
StorageTestSuite.TENANT_ID, ResponseHandler.json(searchForSecondPolicyCompleted));
JsonResponse firstPolicySearchResponse = searchForFirstPolicyCompleted.get(5, TimeUnit.SECONDS);
diff --git a/src/test/java/org/folio/rest/api/LoansApiTest.java b/src/test/java/org/folio/rest/api/LoansApiTest.java
index 19bc3f381..75fdd05c6 100644
--- a/src/test/java/org/folio/rest/api/LoansApiTest.java
+++ b/src/test/java/org/folio/rest/api/LoansApiTest.java
@@ -1,6 +1,7 @@
package org.folio.rest.api;
import static java.lang.Boolean.TRUE;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.folio.rest.support.matchers.HttpResponseStatusCodeMatchers.isBadRequest;
import static org.folio.rest.support.matchers.HttpResponseStatusCodeMatchers.isNotFound;
import static org.folio.rest.support.matchers.LoanMatchers.isClosed;
@@ -23,6 +24,7 @@
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
+import java.net.URLEncoder;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
@@ -1048,7 +1050,7 @@ public void canSearchByUserId()
UUID firstUserId = UUID.randomUUID();
UUID secondUserId = UUID.randomUUID();
- String queryTemplate = InterfaceUrls.loanStorageUrl() + "?query=userId=\"%s\"";
+ String queryTemplate = InterfaceUrls.loanStorageUrl() + "?query=userId='%s'";
loansClient.create(new LoanRequestBuilder().withUserId(firstUserId).create());
loansClient.create(new LoanRequestBuilder().withUserId(firstUserId).create());
@@ -1101,7 +1103,7 @@ public void canFilterByLoanStatus()
UUID userId = UUID.randomUUID();
- String queryTemplate = "query=userId=\"%s\"+and+status.name=\"%s\"";
+ String queryTemplate = "query=userId=%s+and+status.name=%s";
loansClient.create(loanRequest(userId, "Open"));
loansClient.create(loanRequest(userId, "Open"));
diff --git a/src/test/java/org/folio/rest/api/RequestExpirationApiTest.java b/src/test/java/org/folio/rest/api/RequestExpirationApiTest.java
index 26be7b172..ec1616237 100644
--- a/src/test/java/org/folio/rest/api/RequestExpirationApiTest.java
+++ b/src/test/java/org/folio/rest/api/RequestExpirationApiTest.java
@@ -1,24 +1,43 @@
package org.folio.rest.api;
import static org.folio.rest.api.RequestsApiTest.requestStorageUrl;
+import static org.folio.rest.api.StorageTestSuite.TENANT_ID;
+import static org.folio.rest.support.ResponseHandler.empty;
import static org.folio.rest.support.builders.RequestRequestBuilder.CLOSED_PICKUP_EXPIRED;
import static org.folio.rest.support.builders.RequestRequestBuilder.CLOSED_UNFILLED;
import static org.folio.rest.support.builders.RequestRequestBuilder.OPEN_AWAITING_DELIVERY;
import static org.folio.rest.support.builders.RequestRequestBuilder.OPEN_AWAITING_PICKUP;
import static org.folio.rest.support.builders.RequestRequestBuilder.OPEN_IN_TRANSIT;
import static org.folio.rest.support.builders.RequestRequestBuilder.OPEN_NOT_YET_FILLED;
+import static org.folio.rest.support.http.InterfaceUrls.requestExpirationUrl;
+import static org.folio.support.EventType.LOG_RECORD;
+import static org.folio.support.LogEventPayloadField.ORIGINAL;
+import static org.folio.support.LogEventPayloadField.PAYLOAD;
+import static org.folio.support.LogEventPayloadField.REQUESTS;
+import static org.folio.support.LogEventPayloadField.UPDATED;
+import static org.folio.support.MockServer.clearPublishedEvents;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.Is.is;
import java.net.MalformedURLException;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+
+import org.awaitility.Awaitility;
+import org.folio.rest.jaxrs.model.Event;
+import org.folio.rest.jaxrs.model.Request;
import org.folio.rest.support.ApiTests;
+import org.folio.rest.support.Response;
import org.folio.rest.support.builders.RequestRequestBuilder;
import org.folio.support.ExpirationTool;
+import org.folio.support.MockServer;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.After;
@@ -41,6 +60,7 @@ public void beforeEach()
@After
public void checkIdsAfterEach() {
StorageTestSuite.checkForMismatchedIDs(REQUEST_TABLE);
+ clearPublishedEvents();
}
@Test
@@ -65,6 +85,12 @@ public void canExpireASingleOpenUnfilledRequest()
expireRequests();
+ List events = Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .until(MockServer::getPublishedEvents, hasSize(1));
+
+ assertPublishedEvents(events);
+
JsonObject response = getById(requestStorageUrl(String.format("/%s", id)));
assertThat(response.getString("status"), is(CLOSED_UNFILLED));
@@ -94,6 +120,12 @@ public void canExpireASingleOpenAwaitingPickupRequest()
expireRequests();
+ List events = Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .until(MockServer::getPublishedEvents, hasSize(1));
+
+ assertPublishedEvents(events);
+
JsonObject response = getById(requestStorageUrl(String.format("/%s", id)));
assertThat(response.getString("status"), is(CLOSED_PICKUP_EXPIRED));
@@ -123,6 +155,12 @@ public void canExpireASingleOpenAwaitingDeliveryRequest()
expireRequests();
+ List events = Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .until(MockServer::getPublishedEvents, hasSize(1));
+
+ assertPublishedEvents(events);
+
JsonObject response = getById(requestStorageUrl(String.format("/%s", id)));
assertThat(response.getString("status"), is(CLOSED_UNFILLED));
@@ -179,6 +217,12 @@ public void canExpireAnFirstAwaitingPickupRequest()
expireRequests();
+ List events = Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .until(MockServer::getPublishedEvents, hasSize(1));
+
+ assertPublishedEvents(events);
+
JsonObject response1 = getById(requestStorageUrl(String.format("/%s", id1)));
JsonObject response2 = getById(requestStorageUrl(String.format("/%s", id2)));
JsonObject response3 = getById(requestStorageUrl(String.format("/%s", id3)));
@@ -243,6 +287,12 @@ public void canExpireFirstAwaitingDeliveryRequest()
expireRequests();
+ List events = Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .until(MockServer::getPublishedEvents, hasSize(1));
+
+ assertPublishedEvents(events);
+
JsonObject response1 = getById(requestStorageUrl(String.format("/%s", id1)));
JsonObject response2 = getById(requestStorageUrl(String.format("/%s", id2)));
JsonObject response3 = getById(requestStorageUrl(String.format("/%s", id3)));
@@ -600,6 +650,12 @@ public void canExpireOpenUnfilledRequestsInTheMiddleOfAQueue()
expireRequests();
+ List events = Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .until(MockServer::getPublishedEvents, hasSize(10));
+
+ assertPublishedEvents(events);
+
JsonObject response1_1 = getById(requestStorageUrl(String.format("/%s", id1_1)));
JsonObject response1_2 = getById(requestStorageUrl(String.format("/%s", id1_2)));
JsonObject response1_3 = getById(requestStorageUrl(String.format("/%s", id1_3)));
@@ -734,12 +790,24 @@ public void canExpireOpenAwaitingWithNoHoldShelfExpirationDate()
assertThat(response.getInteger("position"), is(1));
}
- private void expireRequests() throws InterruptedException, ExecutionException, TimeoutException {
- CompletableFuture expirationCompleted = new CompletableFuture<>();
+ private void expireRequests() throws InterruptedException, ExecutionException, TimeoutException, MalformedURLException {
+ final var createCompleted = new CompletableFuture();
+
+ client.post(requestExpirationUrl(), TENANT_ID, empty(createCompleted));
- ExpirationTool.doRequestExpiration(StorageTestSuite.getVertx())
- .onComplete(res -> expirationCompleted.complete(null));
+ final var postResponse = createCompleted.get(5, TimeUnit.SECONDS);
+
+ assertThat(postResponse.getStatusCode(), is(204));
+ }
- expirationCompleted.get(5, TimeUnit.SECONDS);
+ private void assertPublishedEvents(List events) {
+ events.forEach(e -> {
+ Event event = e.mapTo(Event.class);
+ assertThat(event.getEventType(), is(LOG_RECORD.name()));
+ JsonObject payload = new JsonObject(event.getEventPayload()).getJsonObject(PAYLOAD.value());
+ Request original = payload.getJsonObject(REQUESTS.value()).getJsonObject(ORIGINAL.value()).mapTo(Request.class);
+ Request updated = payload.getJsonObject(REQUESTS.value()).getJsonObject(UPDATED.value()).mapTo(Request.class);
+ assertThat(original.getStatus(), not(equalTo(updated.getStatus())));
+ });
}
}
diff --git a/src/test/java/org/folio/rest/api/RequestsApiTest.java b/src/test/java/org/folio/rest/api/RequestsApiTest.java
index 421f2792f..4e1bb1614 100644
--- a/src/test/java/org/folio/rest/api/RequestsApiTest.java
+++ b/src/test/java/org/folio/rest/api/RequestsApiTest.java
@@ -1,6 +1,7 @@
package org.folio.rest.api;
import static java.net.HttpURLConnection.HTTP_CREATED;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.asList;
import static org.folio.rest.api.StorageTestSuite.TENANT_ID;
import static org.folio.rest.support.builders.RequestRequestBuilder.CLOSED_CANCELLED;
@@ -1208,7 +1209,8 @@ public void canSearchRequestsByUserProxyId()
CompletableFuture getRequestsCompleted2 = new CompletableFuture<>();
- client.get(requestStorageUrl() + String.format("?query=proxyUserId<>%s", UUID.randomUUID().toString()),
+ String query = String.format("proxyUserId<>%s", UUID.randomUUID().toString());
+ client.get(requestStorageUrl() + "?query=" + URLEncoder.encode(query, UTF_8),
TENANT_ID, ResponseHandler.json(getRequestsCompleted2));
JsonResponse getRequestsResponse2 = getRequestsCompleted2.get(5, TimeUnit.SECONDS);
diff --git a/src/test/java/org/folio/rest/api/StaffSlipsApiTest.java b/src/test/java/org/folio/rest/api/StaffSlipsApiTest.java
index b483c679e..e75483d36 100644
--- a/src/test/java/org/folio/rest/api/StaffSlipsApiTest.java
+++ b/src/test/java/org/folio/rest/api/StaffSlipsApiTest.java
@@ -1,5 +1,6 @@
package org.folio.rest.api;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.folio.rest.support.matchers.HttpResponseStatusCodeMatchers.*;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.collection.IsArrayContainingInAnyOrder.arrayContainingInAnyOrder;
@@ -9,6 +10,7 @@
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
+import java.net.URLEncoder;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -150,7 +152,7 @@ public void canQueryStaffSlip()
String slipId = creationResponse.getJson().getString(ID_KEY);
- URL path = staffSlipsStorageUrl(String.format("?query=%s==\"%s\"", ID_KEY, slipId));
+ URL path = staffSlipsStorageUrl("?query=" + URLEncoder.encode(String.format("%s==\"%s\"", ID_KEY, slipId), UTF_8));
client.get(path, StorageTestSuite.TENANT_ID, ResponseHandler.json(getCompleted));
diff --git a/src/test/java/org/folio/rest/api/StorageTestSuite.java b/src/test/java/org/folio/rest/api/StorageTestSuite.java
index beee50887..9c3008cd4 100644
--- a/src/test/java/org/folio/rest/api/StorageTestSuite.java
+++ b/src/test/java/org/folio/rest/api/StorageTestSuite.java
@@ -1,7 +1,19 @@
package org.folio.rest.api;
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.any;
+import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
+import static org.folio.support.EventType.LOG_RECORD;
+import static org.folio.support.MockServer.getCreatedEventTypes;
+import static org.folio.support.MockServer.getRegisteredPublishers;
+import static org.folio.support.MockServer.getRegisteredSubscribers;
+import static org.folio.util.pubsub.PubSubClientUtils.constructModuleName;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.junit.MatcherAssert.assertThat;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@@ -11,14 +23,18 @@
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.security.InvalidParameterException;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import com.github.tomakehurst.wiremock.WireMockServer;
import org.folio.rest.RestVerticle;
import org.folio.rest.api.loans.LoansAnonymizationApiTest;
import org.folio.rest.api.migration.StaffSlipsMigrationScriptTest;
+import org.folio.rest.jaxrs.model.EventDescriptor;
+import org.folio.rest.jaxrs.model.PublisherDescriptor;
import org.folio.rest.persist.Criteria.Criterion;
import org.folio.rest.persist.PostgresClient;
import org.folio.rest.support.OkapiHttpClient;
@@ -26,6 +42,7 @@
import org.folio.rest.support.ResponseHandler;
import org.folio.rest.support.TextResponse;
import org.folio.rest.tools.utils.NetworkUtils;
+import org.folio.support.MockServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
@@ -70,8 +87,12 @@ public class StorageTestSuite {
public static final String TENANT_ID = "test_tenant";
private static Vertx vertx;
- public static final int PORT = NetworkUtils.nextFreePort();
+ public static final int VERTICLE_PORT = NetworkUtils.nextFreePort();
+ public static final int PROXY_PORT = NetworkUtils.nextFreePort();
+ public static final int OKAPI_MOCK_PORT = NetworkUtils.nextFreePort();
private static boolean initialised = false;
+ private static MockServer mockServer;
+ private static final WireMockServer wireMockServer = new WireMockServer(PROXY_PORT);
/**
* Return a URL for the path and the parameters.
@@ -81,7 +102,7 @@ public class StorageTestSuite {
*/
public static URL storageUrl(String path, String ... parameterKeyValue) throws MalformedURLException {
if (parameterKeyValue.length == 0) {
- return new URL("http", "localhost", PORT, path);
+ return new URL("http", "localhost", PROXY_PORT, path);
}
if (parameterKeyValue.length % 2 == 1) {
throw new InvalidParameterException("Expected even number of key/value strings, found "
@@ -96,7 +117,7 @@ public static URL storageUrl(String path, String ... parameterKeyValue) throws M
.append('=')
.append(URLEncoder.encode(parameterKeyValue[i+1], StandardCharsets.UTF_8.name()));
}
- return new URL("http", "localhost", PORT, completePath.toString());
+ return new URL("http", "localhost", PROXY_PORT, completePath.toString());
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
@@ -156,11 +177,24 @@ public static void before()
DeploymentOptions options = new DeploymentOptions();
- options.setConfig(new JsonObject().put("http.port", PORT));
+ options.setConfig(new JsonObject().put("http.port", VERTICLE_PORT));
options.setWorker(true);
startVerticle(options);
+ mockServer = new MockServer(OKAPI_MOCK_PORT, vertx);
+ mockServer.start();
+
+ wireMockServer.start();
+
+ wireMockServer.stubFor(post(urlMatching("/pubsub/.*"))
+ .atPriority(1)
+ .willReturn(aResponse().proxiedFrom("http://localhost:" + OKAPI_MOCK_PORT)));
+
+ wireMockServer.stubFor(any(anyUrl())
+ .atPriority(10)
+ .willReturn(aResponse().proxiedFrom("http://localhost:" + VERTICLE_PORT)));
+
prepareTenant(TENANT_ID);
initialised = true;
@@ -176,6 +210,8 @@ public static void after()
removeTenant(TENANT_ID);
+ mockServer.close();
+
CompletableFuture undeploymentComplete = new CompletableFuture<>();
vertx.close(res -> {
@@ -316,6 +352,22 @@ private static void prepareTenant(String tenantId) {
assertThat(failureMessage, response.getStatusCode(), is(201));
+ List eventTypes = getCreatedEventTypes();
+ assertThat(eventTypes, hasSize(1));
+ EventDescriptor descriptor = eventTypes.get(0).mapTo(EventDescriptor.class);
+ assertThat(descriptor.getEventType(), equalTo(LOG_RECORD.name()));
+
+ List publishers = getRegisteredPublishers();
+ assertThat(publishers, hasSize(1));
+ PublisherDescriptor publisher = publishers.get(0).mapTo(PublisherDescriptor.class);
+ assertThat(publisher.getModuleId(), equalTo(constructModuleName()));
+
+ assertThat(publisher.getEventDescriptors(), hasSize(1));
+ assertThat(publisher.getEventDescriptors().get(0).getEventType(), equalTo(LOG_RECORD.name()));
+
+ List subscribers = getRegisteredSubscribers();
+ assertThat(subscribers, hasSize(0));
+
} catch (Exception e) {
log.error("Tenant preparation failed: " + e.getMessage(), e);
assert false;
diff --git a/src/test/java/org/folio/rest/support/ApiTests.java b/src/test/java/org/folio/rest/support/ApiTests.java
index a6c171f57..2d9dceacd 100644
--- a/src/test/java/org/folio/rest/support/ApiTests.java
+++ b/src/test/java/org/folio/rest/support/ApiTests.java
@@ -6,6 +6,7 @@
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.ClassRule;
import java.net.HttpURLConnection;
import java.net.URL;
@@ -22,6 +23,7 @@
public class ApiTests {
private static boolean runningOnOwn;
+
protected final OkapiHttpClient client = new OkapiHttpClient(StorageTestSuite.getVertx());
@BeforeClass
diff --git a/src/test/java/org/folio/rest/support/http/InterfaceUrls.java b/src/test/java/org/folio/rest/support/http/InterfaceUrls.java
index 31f026fd5..2145779a3 100644
--- a/src/test/java/org/folio/rest/support/http/InterfaceUrls.java
+++ b/src/test/java/org/folio/rest/support/http/InterfaceUrls.java
@@ -31,4 +31,8 @@ public static URL anonymizeLoansURL() throws MalformedURLException {
public static URL checkInsStorageUrl(String subPath) throws MalformedURLException {
return StorageTestSuite.storageUrl("/check-in-storage/check-ins" + subPath);
}
+
+ public static URL requestExpirationUrl() throws MalformedURLException {
+ return StorageTestSuite.storageUrl("/scheduled-request-expiration");
+ }
}
diff --git a/src/test/java/org/folio/rest/support/spring/TestContextConfiguration.java b/src/test/java/org/folio/rest/support/spring/TestContextConfiguration.java
index 17189df87..f15039a91 100644
--- a/src/test/java/org/folio/rest/support/spring/TestContextConfiguration.java
+++ b/src/test/java/org/folio/rest/support/spring/TestContextConfiguration.java
@@ -30,7 +30,7 @@ public class TestContextConfiguration {
public OkapiHeaders okapiHeaders() {
return OkapiHeaders.builder()
.tenantId(TENANT_ID)
- .url("http://localhost:" + StorageTestSuite.PORT)
+ .url("http://localhost:" + StorageTestSuite.PROXY_PORT)
.token(generateToken())
.userId(USER_ID)
.build();
diff --git a/src/test/java/org/folio/support/MockServer.java b/src/test/java/org/folio/support/MockServer.java
new file mode 100644
index 000000000..4213f4599
--- /dev/null
+++ b/src/test/java/org/folio/support/MockServer.java
@@ -0,0 +1,145 @@
+package org.folio.support;
+
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.json.JsonObject;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.BodyHandler;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.assertj.core.api.Fail.fail;
+import static org.folio.HttpStatus.HTTP_CREATED;
+import static org.folio.HttpStatus.HTTP_NO_CONTENT;
+
+public class MockServer extends AbstractVerticle {
+
+ private static final List publishedEvents = new ArrayList<>();
+ private static final List createdEventTypes = new ArrayList<>();
+ private static final List registeredPublishers = new ArrayList<>();
+ private static final List registeredSubscribers = new ArrayList<>();
+ private static final List deletedEventTypes = new ArrayList<>();
+
+ private static final Logger logger = LoggerFactory.getLogger(MockServer.class);
+
+ private final int port;
+ private final Vertx vertx;
+
+ public MockServer(int port, Vertx vertx) {
+ this.port = port;
+ this.vertx = vertx;
+ }
+
+ public void start() throws InterruptedException, ExecutionException, TimeoutException {
+ HttpServer server = vertx.createHttpServer();
+ CompletableFuture deploymentComplete = new CompletableFuture<>();
+ server.requestHandler(register()).listen(port, result -> {
+ if(result.succeeded()) {
+ deploymentComplete.complete(result.result());
+ }
+ else {
+ deploymentComplete.completeExceptionally(result.cause());
+ }
+ });
+ deploymentComplete.get(30, TimeUnit.SECONDS);
+ }
+
+ public void close() {
+ vertx.close(res -> {
+ if (res.failed()) {
+ logger.error("Failed to shut down mock server", res.cause());
+ fail(res.cause().getMessage());
+ } else {
+ logger.info("Successfully shut down mock server");
+ }
+ });
+ }
+
+ public Router register() {
+
+ Router router = Router.router(vertx);
+
+ router.route().handler(BodyHandler.create());
+
+ router.post("/pubsub/publish")
+ .handler(routingContext -> {
+ publishedEvents.add(routingContext.getBodyAsJson());
+ routingContext.response()
+ .setStatusCode(HTTP_NO_CONTENT.toInt())
+ .end();
+ });
+
+ router.post("/pubsub/event-types")
+ .handler(ctx -> postTenant(ctx, createdEventTypes));
+
+ router.post("/pubsub/event-types/declare/publisher")
+ .handler(ctx -> postTenant(ctx, registeredPublishers));
+
+ router.post("/pubsub/event-types/declare/subscriber")
+ .handler(ctx -> postTenant(ctx, registeredSubscribers));
+
+ router.delete("/pubsub/event-types/:eventTypeName/publishers")
+ .handler(MockServer::deleteTenant);
+
+ return router;
+ }
+
+ private static void postTenant(RoutingContext routingContext,
+ List requestBodyList) {
+
+ if (requestBodyList != null) {
+ requestBodyList.add(routingContext.getBodyAsJson());
+ }
+ String json = routingContext.getBodyAsJson().encodePrettily();
+ Buffer buffer = Buffer.buffer(json, "UTF-8");
+ routingContext.response()
+ .setStatusCode(HTTP_CREATED.toInt())
+ .putHeader("content-type", "application/json; charset=utf-8")
+ .putHeader("content-length", Integer.toString(buffer.length()))
+ .write(buffer)
+ .end();
+ }
+
+ private static void deleteTenant(RoutingContext routingContext) {
+ deletedEventTypes.add(Arrays.asList(routingContext.normalisedPath().split("/")).get(3));
+
+ routingContext.response()
+ .setStatusCode(HTTP_NO_CONTENT.toInt())
+ .end();
+ }
+
+ public static List getPublishedEvents() {
+ return publishedEvents;
+ }
+
+ public static List getCreatedEventTypes() {
+ return createdEventTypes;
+ }
+
+ public static List getRegisteredPublishers() {
+ return registeredPublishers;
+ }
+
+ public static List getRegisteredSubscribers() {
+ return registeredSubscribers;
+ }
+
+ public static List getDeletedEventTypes() {
+ return deletedEventTypes;
+ }
+
+ public static void clearPublishedEvents() {
+ publishedEvents.clear();
+ }
+}
diff --git a/src/test/resources/MessagingDescriptor.json b/src/test/resources/MessagingDescriptor.json
new file mode 100644
index 000000000..b1981c677
--- /dev/null
+++ b/src/test/resources/MessagingDescriptor.json
@@ -0,0 +1,12 @@
+{
+ "publications": [
+ {
+ "eventType": "LOG_RECORD",
+ "description": "Created log record event",
+ "eventTTL": 1,
+ "signed": false
+ }
+ ],
+ "subscriptions": [
+ ]
+}