Skip to content

Commit

Permalink
CIRCSTORE-247 - Request log actions (#272)
Browse files Browse the repository at this point in the history
(cherry picked from commit 641c632)
  • Loading branch information
siarhei-charniak committed Nov 19, 2020
1 parent 3dc34c4 commit 509dc81
Show file tree
Hide file tree
Showing 27 changed files with 759 additions and 42 deletions.
43 changes: 42 additions & 1 deletion descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
{
"id": "${artifactId}-${version}",
"name": "Circulation Storage Module",
"requires": [
{
"id": "pubsub-event-types",
"version": "0.1"
},
{
"id": "pubsub-publishers",
"version": "0.1"
},
{
"id": "pubsub-subscribers",
"version": "0.1"
},
{
"id": "pubsub-publish",
"version": "0.1"
}
],
"provides": [
{
"id": "request-storage-batch",
Expand Down Expand Up @@ -379,7 +397,12 @@
"handlers": [
{
"methods": ["POST"],
"pathPattern": "/_/tenant"
"pathPattern": "/_/tenant",
"modulePermissions": [
"pubsub.event-types.post",
"pubsub.publishers.post",
"pubsub.subscribers.post"
]
}, {
"methods": ["DELETE"],
"pathPattern": "/_/tenant"
Expand Down Expand Up @@ -449,6 +472,24 @@
]
}
]
},
{
"id": "_timer",
"version": "1.0",
"interfaceType": "system",
"handlers": [
{
"methods": [
"POST"
],
"pathPattern": "/request-expiration/scheduled-request-expiration",
"modulePermissions": [
"pubsub.publish.post"
],
"unit": "minute",
"delay": "2"
}
]
}
],
"permissionSets": [
Expand Down
30 changes: 29 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.glassfish.web</groupId>
Expand Down Expand Up @@ -114,6 +113,35 @@
<version>${rest-assured.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.folio</groupId>
<artifactId>mod-pubsub-client</artifactId>
<version>1.3.1</version>
<exclusions>
<exclusion>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock</artifactId>
<version>2.24.0</version>
<exclusions>
<exclusion>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
</dependencies>

<properties>
Expand Down
40 changes: 26 additions & 14 deletions src/main/java/org/folio/rest/impl/RequestExpiryImpl.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,36 @@

package org.folio.rest.impl;

import static org.folio.rest.RestVerticle.MODULE_SPECIFIC_ARGS;
import static io.vertx.core.Future.succeededFuture;
import static org.folio.rest.jaxrs.resource.ScheduledRequestExpiration.ScheduledRequestExpirationResponse.respond204;
import static org.folio.rest.jaxrs.resource.ScheduledRequestExpiration.ScheduledRequestExpirationResponse.respond500WithTextPlain;
import static org.folio.support.ExpirationTool.doRequestExpiration;

import io.vertx.core.Context;
import io.vertx.core.Vertx;
import java.util.Map;

import org.folio.rest.resource.interfaces.PeriodicAPI;
import org.folio.support.ExpirationTool;
import javax.ws.rs.core.Response;

public class RequestExpiryImpl implements PeriodicAPI {
import org.folio.rest.jaxrs.resource.ScheduledRequestExpiration;

@Override
public long runEvery() {
String intervalString = MODULE_SPECIFIC_ARGS.getOrDefault("request.expire.interval",
"120000");
return Long.parseLong(intervalString);
}
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Handler;

public class RequestExpiryImpl implements ScheduledRequestExpiration {

@Override
public void run(Vertx vertx, Context context) {
context.runOnContext(v -> ExpirationTool.doRequestExpiration(vertx));
public void expireRequests(Map<String, String> okapiHeaders,
Handler<AsyncResult<Response>> asyncResultHandler, Context context) {

context.runOnContext(v -> doRequestExpiration(okapiHeaders, context.owner())
.onComplete(result -> {
if (result.succeeded()) {
asyncResultHandler.handle(succeededFuture(respond204()));
} else {
asyncResultHandler.handle(
succeededFuture(respond500WithTextPlain(result.cause().getMessage())));
}
})
);
}
}
13 changes: 10 additions & 3 deletions src/main/java/org/folio/rest/impl/TenantRefAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import org.folio.rest.tools.utils.TenantLoading;
import org.folio.service.PubSubRegistrationService;

public class TenantRefAPI extends TenantAPI {
private static final Logger log = LoggerFactory.getLogger(TenantRefAPI.class);
Expand Down Expand Up @@ -44,11 +45,17 @@ public void postTenant(TenantAttributes ta, Map<String, String> headers,
.respond500WithTextPlain(res1.cause().getLocalizedMessage())));
return;
}
hndlr.handle(io.vertx.core.Future.succeededFuture(PostTenantResponse
.respond201WithApplicationJson("")));
PubSubRegistrationService.registerModule(headers, vertx)
.whenComplete((aBoolean, throwable) ->
hndlr.handle(io.vertx.core.Future.succeededFuture(PostTenantResponse
.respond201WithApplicationJson(""))));
});
}, cntxt);
}


@Override
public void deleteTenant(Map<String, String> headers, Handler<AsyncResult<Response>> handlers, Context cntx) {
PubSubRegistrationService.unregisterModule(headers, cntx.owner())
.thenRun(() -> super.deleteTenant(headers, handlers, cntx));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.folio.rest.jaxrs.resource;

import java.util.Map;

import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response;

import org.folio.rest.jaxrs.resource.support.ResponseDelegate;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Handler;

import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
import static javax.ws.rs.core.MediaType.TEXT_PLAIN;

@Path("/scheduled-request-expiration")
public interface ScheduledRequestExpiration {

@POST
@Produces("text/plain")
void expireRequests(Map<String, String> okapiHeaders,
Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext);

class ScheduledRequestExpirationResponse extends ResponseDelegate {
private ScheduledRequestExpirationResponse(Response response, Object entity) {
super(response, entity);
}

private ScheduledRequestExpirationResponse(Response response) {
super(response);
}

public static ScheduledRequestExpirationResponse respond204() {
Response.ResponseBuilder responseBuilder = Response.status(204);
return new ScheduledRequestExpirationResponse(responseBuilder.build());
}

public static ScheduledRequestExpirationResponse respond500WithTextPlain(String reason) {
Response.ResponseBuilder responseBuilder = Response.status(500).header(CONTENT_TYPE, TEXT_PLAIN);
responseBuilder.entity(reason);

return new ScheduledRequestExpirationResponse(responseBuilder.build(), reason);
}
}
}
34 changes: 34 additions & 0 deletions src/main/java/org/folio/service/EventPublisherService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.folio.service;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import org.folio.support.EventType;
import org.folio.support.JsonPropertyWriter;
import org.folio.support.LogEventPayloadField;
import org.folio.support.exception.LogEventType;

import java.util.Map;

import static org.folio.support.JsonPropertyWriter.write;
import static org.folio.support.LogEventPayloadField.LOG_EVENT_TYPE;

public class EventPublisherService {

private final PubSubPublishingService pubSubPublishingService;

public EventPublisherService(Vertx vertx, Map<String, String> okapiHeaders) {
pubSubPublishingService = new PubSubPublishingService(vertx, okapiHeaders);
}

public Future<Void> publishLogRecord(JsonObject context, LogEventType payloadType) {
context = new JsonObject().put(LogEventPayloadField.PAYLOAD.value(), context);
write(context, LOG_EVENT_TYPE.value(), payloadType.value());
Promise<Void> promise = Promise.promise();
pubSubPublishingService.publishEvent(EventType.LOG_RECORD.name(), context.encode())
.thenAccept(r -> promise.complete());
return promise.future();
}

}
67 changes: 67 additions & 0 deletions src/main/java/org/folio/service/PubSubPublishingService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.folio.service;

import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import org.folio.rest.jaxrs.model.Event;
import org.folio.rest.jaxrs.model.EventMetadata;
import org.folio.rest.util.OkapiConnectionParams;
import org.folio.util.pubsub.PubSubClientUtils;

import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;

class PubSubPublishingService {
private static final Logger logger = LoggerFactory.getLogger(PubSubPublishingService.class);

private final Map<String, String> okapiHeaders;
private final Context context;

public PubSubPublishingService(Vertx vertx, Map<String, String> okapiHeaders) {
context = vertx.getOrCreateContext();
this.okapiHeaders = okapiHeaders;
}

public CompletableFuture<Boolean> publishEvent(String eventType, String payload) {
Event event = new Event().withId(UUID.randomUUID()
.toString())
.withEventType(eventType)
.withEventPayload(payload)
.withEventMetadata(new EventMetadata().withPublishedBy(PubSubClientUtils.constructModuleName())
.withTenantId(okapiHeaders.get(OKAPI_TENANT_HEADER))
.withEventTTL(1));

final CompletableFuture<Boolean> publishResult = new CompletableFuture<>();
OkapiConnectionParams params = new OkapiConnectionParams();
params.setOkapiUrl(okapiHeaders.get(OKAPI_URL_HEADER));
params.setTenantId(okapiHeaders.get(OKAPI_TENANT_HEADER));
params.setToken(okapiHeaders.get(OKAPI_TOKEN_HEADER));

context.runOnContext(v -> PubSubClientUtils.sendEventMessage(event, params)
.whenComplete((result, throwable) -> {
if (Boolean.TRUE.equals(result)) {
logger.debug("Event published successfully. ID: {}, type: {}, payload: {}", event.getId(), event.getEventType(),
event.getEventPayload());
publishResult.complete(true);
} else {
logger.error("Failed to publish event. ID: {}, type: {}, payload: {}", throwable, event.getId(), event.getEventType(),
event.getEventPayload());

if (throwable == null) {
publishResult.complete(false);
} else {
publishResult.completeExceptionally(throwable);
}
}
}));

return publishResult;
}
}
Loading

0 comments on commit 509dc81

Please sign in to comment.